Hi,
I am working on oracle to postgres migration, using Connect-standalone properties.
data is passing through, Oracle-Kafka Topic - Postgres.
I have worked on source connector and its worked fine, from oracle to kafka topic values are passing. I have taken value converter as, org.springframework.kafka.support.serializer.JsonSerializer
my oracle data shows like below.
Name Null? Type
EMPNO NOT NULL NUMBER(5)
ENAME VARCHAR2(15)
JOININGDATE TIMESTAMP(6)
RELEASEDATE TIMESTAMP(6)
17 Kavya 17-09-21 12:00:00.000000000 AM 21-01-24 12:00:00.000000000 AM
Kafka Topic i can able to see Data as below.
Struct{EMPNO=16,ENAME=Dimpu,JOININGDATE=2021-09-17 05:30:00.0,RELEASEDATE=2024-01-21 05:30:00.0}
Struct{EMPNO=17,ENAME=Kavya,JOININGDATE=2021-09-17 05:30:00.0,RELEASEDATE=2024-01-21 05:30:00.0}
Source connector completed and working fine.
now, I have started working on sink connector, using org.springframework.kafka.support.serializer.JsonSerializer (value.converter)
I am running source and sink connectors as follows,
c:\kafka\bin\windows>connect-standalone.bat …..\config\connect-standalone.properties …..\config\kafkasource1.properties (Source) …..\config\kafjora.properties (Sink)
1). can we run source and sink at same time like above or create new (rest.port, 8083) needs to change.
I am not getting any error but data was not pushing into postgres Database tables.
providing logs, please help me to out on this issue.
used both sides, source, jsonserializer and sink side jsondeserializer, transformations.
Please check logs
[2024-02-09 10:04:11,255] INFO [person4|task-0] Initializing JDBC writer (io.confluent.connect.jdbc.sink.JdbcSinkTask:65)
[2024-02-09 10:04:11,255] INFO [person4|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)
[2024-02-09 10:04:11,255] INFO [person4|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)
[2024-02-09 10:04:11,257] INFO [person4|task-0] Initializing writer using SQL dialect: OracleDatabaseDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:72)
[2024-02-09 10:04:11,258] INFO [person4|task-0] JDBC writer initialized (io.confluent.connect.jdbc.sink.JdbcSinkTask:74)
[2024-02-09 10:04:11,258] INFO [person4|task-0] WorkerSinkTask{id=person4-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:330)
[2024-02-09 10:04:11,259] INFO [person4|task-0] WorkerSinkTask{id=person4-0} Executing sink task (org.apache.kafka.connect.runtime.WorkerSinkTask:214)
[2024-02-09 10:04:11,271] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Cluster ID: CvC-2LM9SuCT0YjCC360vw (org.apache.kafka.clients.Metadata:287)
[2024-02-09 10:04:11,272] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:906)
[2024-02-09 10:04:11,273] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:576)
[2024-02-09 10:04:11,289] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Request joining group due to: need to re-join with the given member-id: connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[2024-02-09 10:04:11,290] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Request joining group due to: rebalance failed due to ‘The group member needs to have a valid member id before actually entering a consumer group.’ (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[2024-02-09 10:04:11,290] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:576)
[2024-02-09 10:04:11,294] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Successfully joined group with generation Generation{generationId=3, memberId=‘connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:637)
[2024-02-09 10:04:11,302] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Finished assignment for group at generation 3: {connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa=Assignment(partitions=[ora2post-0, ora2post-1, ora2post-2, ora2post-3, ora2post-4])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:717)
[2024-02-09 10:04:11,311] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Successfully synced group in generation Generation{generationId=3, memberId=‘connector-consumer-person4-0-ae801348-b47c-42e4-b7ae-5bd181b5b8fa’, protocol=‘range’} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:812)
[2024-02-09 10:04:11,312] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Notifying assignor about the new Assignment(partitions=[ora2post-0, ora2post-1, ora2post-2, ora2post-3, ora2post-4]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:312)
[2024-02-09 10:04:11,314] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Adding newly assigned partitions: ora2post-0, ora2post-1, ora2post-2, ora2post-3, ora2post-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:324)
[2024-02-09 10:04:11,317] INFO [oracle-kafka|task-0] Begin using SQL query: select * from emp WHERE “RELEASEDATE” < ? AND ((“RELEASEDATE” = ? AND “EMPNO” > ?) OR “RELEASEDATE” > ?) ORDER BY “RELEASEDATE”,“EMPNO” ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
[2024-02-09 10:04:11,327] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,327] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,327] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,329] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,329] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Found no committed offset for partition ora2post-2 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1578)
[2024-02-09 10:04:11,338] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,339] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,340] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-4 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,340] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
[2024-02-09 10:04:11,340] INFO [person4|task-0] [Consumer clientId=connector-consumer-person4-0, groupId=connect-person4] Resetting offset for partition ora2post-2 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:398)
source connector
{
name= my-jdbc-connector
config= {
name=kafkasouece
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
name=oracle-kafka
connection.url=jdbc:oracle:thin:@localhost:1521/orcl
connection.user=kafka
connection.password=
task.max=1
topic.prefix= kcheck
mode= timestamp+incrementing
query=select * from emp
timestamp.column.name=RELEASEDATE
incrementing.column.name=EMPNO
poll.interval.ms=1000
numeric.mapping=best_fit
errors.tolerance=all
internal.key.converter=io.confluent.connect.jdbc.JdbcSourceConnector
internal.value.converter=org.springframework.kafka.support.serializer.JsonSerializer
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect
}
}
sink connector
{
name=jdbc_sink_connector_oracle_01
config={
name= Kafka-sink-postgres1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
task.max=1
topics=sorder
internal.key.converter=io.confluent.connect.jdbc.JdbcSourceConnector
internal.value.converter=org.springframework.kafka.support.serializer.JsonDeserializer
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
dialect.name=OracleDatabaseDialect
//JDBC Sink Connectors specific properties
connection.url=jdbc:postgresql://localhost:5432/Kafka_PostDB
connection.user=postgres
connection.password=
poll.interval.ms=1000
numeric.mapping=best_fit
insert.mode=upsert
auto.create=true
batch.size=2
table.name.format=emp
pk.mode=record_value
pk.fields=empno
db.timezone=Asia/Kolkata
}
}
Please, help on this.