Kafka jdbc source connector to springboot java

Hi,

my requirement is to send data from kafka jdbc conenctor to kafka api using springboot java application.

Kafka JDBC Source Connector:

{
name= my-jdbc-connector1
config= {
name=kafkasource
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@localhost:1521/orcl
connection.user=kafka
connection.password=check
task.max=1
topic.prefix= sqlkafkapost1
mode= timestamp+incrementing
query=SELECT EMPNO,ENAME,JOININGDATE,RELEASEDATE FROM EMP
timestamp.column.name=RELEASEDATE
incrementing.column.name=EMPNO
poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect
}
}

I can see my data passing through kafka topic as below

c:\kafka\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic sqlkafkapost1
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“EMPNO”},{“type”:“string”,“optional”:true,“field”:“ENAME”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“JOININGDATE”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“RELEASEDATE”}],“optional”:false},“payload”:{“EMPNO”:129,“ENAME”:“venkat”,“JOININGDATE”:1644667994000,“RELEASEDATE”:1713830400000}}
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“EMPNO”},{“type”:“string”,“optional”:true,“field”:“ENAME”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“JOININGDATE”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“RELEASEDATE”}],“optional”:false},“payload”:{“EMPNO”:130,“ENAME”:“venkat”,“JOININGDATE”:1644667994000,“RELEASEDATE”:1713830400000}}
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“EMPNO”},{“type”:“string”,“optional”:true,“field”:“ENAME”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“JOININGDATE”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“RELEASEDATE”}],“optional”:false},“payload”:{“EMPNO”:131,“ENAME”:“venkat”,“JOININGDATE”:1644667994000,“RELEASEDATE”:1713830400000}}
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“EMPNO”},{“type”:“string”,“optional”:true,“field”:“ENAME”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“JOININGDATE”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“RELEASEDATE”}],“optional”:false},“payload”:{“EMPNO”:132,“ENAME”:“venkat”,“JOININGDATE”:1644667994000,“RELEASEDATE”:1713830400000}}
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“EMPNO”},{“type”:“string”,“optional”:true,“field”:“ENAME”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“JOININGDATE”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“RELEASEDATE”}],“optional”:false},“payload”:{“EMPNO”:133,“ENAME”:“gopi”,“JOININGDATE”:1644667994000,“RELEASEDATE”:1713830400000}}

I have written code in springboot as like below.

@Builder
@Data
@AllArgsConstructor
@NoArgsConstructor

public class SalesOrder {
private Integer EMPNO;
private String ENAME;
private Timestamp JOININGDATE;
private Timestamp RELEASEDATE;

}

@Service
public class KafkaConsumerService {

@KafkaListener(topics="sqlkafkapost1", groupId="Group100", containerFactory = "salesorderListener")
public void listen(@RequestBody SalesOrder salesOrder)
{
    System.out.println("Received ' " + salesOrder + " ' from the Topic. ");
}

}

I can see values in springboot but as null,

2024-06-11T12:39:26.041+05:30 INFO 36860 — [salescon] [ntainer#0-0-C-1] k.c.c.i.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-Group100-1, groupId=Group100] Adding newly assigned partitions: sqlkafkapost1-0
2024-06-11T12:39:26.053+05:30 INFO 36860 — [salescon] [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerUtils : Setting offset for partition sqlkafkapost1-0 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-06-11T12:39:26.070+05:30 INFO 36860 — [salescon] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Group100: partitions assigned: [sqlkafkapost1-0]
Received ’ SalesOrder(EMPNO=null, ENAME=null, JOININGDATE=null, RELEASEDATE=null) ’ from the Topic.

Kindly, let me know how can i send values as expected, coming from kafka topic…

Please, let me know solution, how can we write code.

in kafka jdbcsource connector, I am using, key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

in springboot api deserializer I am using,
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

so, how can we send values , is this getting any issue.

Thanks and Regards,
Anilkumar. S

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.