Kafka connector - Sink connector not able to get data into postgres DB

Hi,

I am working on oracle to postgres migration, using Connect-standalone properties.

data is passing through, Oracle-Kafka Topic - Postgres.

I have worked on source connector and its worked fine, from oracle to kafka topic values are passing. I have taken value converter as, org.springframework.kafka.support.serializer.JsonSerializer

my oracle data shows like below.

Name Null? Type


EMPNO NOT NULL NUMBER(5)
ENAME VARCHAR2(15)
JOININGDATE TIMESTAMP(6)
RELEASEDATE TIMESTAMP(6)

17 Kavya 17-09-21 12:00:00.000000000 AM 21-01-24 12:00:00.000000000 AM

Kafka Topic i can able to see Data as below.


Struct{EMPNO=16,ENAME=Dimpu,JOININGDATE=2021-09-17 05:30:00.0,RELEASEDATE=2024-01-21 05:30:00.0}
Struct{EMPNO=17,ENAME=Kavya,JOININGDATE=2021-09-17 05:30:00.0,RELEASEDATE=2024-01-21 05:30:00.0}

Source connector completed and working fine.

now, I have started working on sink connector, using org.springframework.kafka.support.serializer.JsonSerializer (value.converter)

I am running source and sink connectors as follows,

c:\kafka\bin\windows>connect-standalone.bat …..\config\connect-standalone.properties …..\config\kafkasource1.properties (Source) …..\config\kafjora.properties (Sink)

1). can we run source and sink at same time like above or create new (rest.port, 8083) needs to change.

I am not getting any error but data was not pushing into postgres Database tables.

providing logs, please help me to out on this issue.

used both sides, source, jsonserializer and sink side jsondeserializer, transformations.

Please check logs

[2024-02-09 10:04:11,255] INFO [person4|task-0] Initializing JDBC writer (io.confluent.connect.jdbc.sink.JdbcSinkTask:65)
[2024-02-09 10:04:11,255] INFO [person4|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)
[2024-02-09 10:04:11,255] INFO [person4|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)
[2024-02-09 10:04:11,257] INFO [person4|task-0] Initializing writer using SQL dialect: OracleDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:72)
[2024-02-09 10:04:11,258] INFO [person4|task-0] JDBC writer initialized (io.confluent.connect.jdbc.sink.JdbcSinkTask:74)
[2024-02-09 10:04:11,258] INFO [person4|task-0] WorkerSinkTask{id=person4-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:330)
[2024-02-09 10:04:11,259] INFO [person4|task-0] WorkerSinkTask{id=person4-0} Executing sink task (org.apache.kafka.connect.runtime.WorkerSinkTask:214)
[2024-02-09 10:04:11,271] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Cluster ID: CvC-2LM9SuCT0YjCC360vw (org.apache.kafka.clients.Metadata:287)
[2024-02-09 10:04:11,272] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:906)
[2024-02-09 10:04:11,273] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:576)
[2024-02-09 10:04:11,289] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Request joining group due to: need to re-join with the given member-id: connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[2024-02-09 10:04:11,290] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Request joining group due to: rebalance failed due to ‘The group member needs to have a valid member id before actually entering a consumer group.’ (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[2024-02-09 10:04:11,290] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:576)
[2024-02-09 10:04:11,294] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Successfully joined group with generation Generation{generationId=3, memberId=‘connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:637)
[2024-02-09 10:04:11,302] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Finished assignment for group at generation 3: {connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa=Assignment(partitions=[ora2post-0, ora2post-1, ora2post-2, ora2post-3, ora2post-4])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:717)
[2024-02-09 10:04:11,311] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Successfully synced group in generation Generation{generationId=3, memberId=‘connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:812)
[2024-02-09 10:04:11,312] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Notifying assignor about the new Assignment(partitions=[ora2post-0, ora2post-1, ora2post-2, ora2post-3, ora2post-4]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:312)
[2024-02-09 10:04:11,314] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Adding newly assigned partitions: ora2post-0, ora2post-1, ora2post-2, ora2post-3, ora2post-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:324)
[2024-02-09 10:04:11,317] INFO [oracle-kafka|task-0] Begin using SQL query: select * from emp WHERE “RELEASEDATE” < ? AND ((“RELEASEDATE” = ? AND “EMPNO” > ?) OR “RELEASEDATE” > ?) ORDER BY “RELEASEDATE”,“EMPNO” ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
[2024-02-09 10:04:11,327] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,327] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,327] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,329] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,329] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,338] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,339] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,340] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-4 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,340] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,340] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)

source connector

{
name= my-jdbc-connector
config= {
name=kafkasouece
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@localhost:1521/orcl
connection.user=kafka
connection.password=
task.max=1
topic.prefix= kcheck
mode= timestamp+incrementing
query=select * from emp
timestamp.column.name=RELEASEDATE
incrementing.column.name=EMPNO
poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all
internal.key.converter=io.confluent.connect.jdbc.JdbcSourceConnector
internal.value.converter=org.springframework.kafka.support.serializer.JsonSerializer
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect
}
}

sink connector

{
name=jdbc_sink_connector_oracle_01
config={
name= Kafka-sink-postgres1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
task.max=1
topics=sorder

internal.key.converter=io.confluent.connect.jdbc.JdbcSourceConnector
internal.value.converter=org.springframework.kafka.support.serializer.JsonDeserializer
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect

//JDBC Sink Connectors specific properties

connection.url=jdbc:postgresql://localhost:5432/Kafka_PostDB
connection.user=postgres
connection.password=

poll.interval.ms=1000
numeric.mapping=best_fit

insert.mode=upsert

auto.create=true

batch.size=2

table.name.format=emp

pk.mode=record_value
pk.fields=empno

db.timezone=Asia/Kolkata
}
}

Please, help on this.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.