Hi,
my data is coming from Oracle as below in topic.
{“schema”:{“type”:“struct”,“fields”:[{“type”:“int32”,“optional”:false,“field”:“EMPNO”},{“type”:“string”,“optional”:true,“field”:“ENAME”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“JOININGDATE”},{“type”:“int64”,“optional”:true,“name”:“org.apache.kafka.connect.data.Timestamp”,“version”:1,“field”:“RELEASEDATE”}],“optional”:false},“payload”:{“EMPNO”:11,“ENAME”:“Ramana”,“JOININGDATE”:1631836800000,“RELEASEDATE”:1705795200000}}
data is coming in payload as i kept below configurations.
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
dialect.name=PostgreSqlDatabaseDialect
I have changed in postgresql DB, with caps letters created table .
Here, I have observed , if I created data into small letters also, oracle is taking caps, and where as Postgres i have created in Caps, but it taking as small letters.
so, I have used replaced fields valued and converted as below.
transforms = RenameField
transforms.RenameField.type = org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames = EMPNO:empno,ENAME:ename,JOININGDATE:joiningdate,RELEASEDATE:releasedate
Still, my issue was not resolved, data was not pushing into Postgres. any clue.
I can see in logs, from source connector side, as below.
[2024-02-10 23:13:55,289] INFO [Kafka-sink-postgres |task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:176)
[2024-02-10 23:13:55,296] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Revoke previously assigned partitions hellokafka-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:343)
[2024-02-10 23:13:55,296] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Member connector-consumer-Kafka-sink-postgres -0-ace35691-da90-4fd5-92c2-f2ecaadcfa17 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1133)
[2024-02-10 23:13:55,298] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025)
[2024-02-10 23:13:55,298] INFO [Kafka-sink-postgres |task-0] [Consumer clientId=connector-consumer-Kafka-sink-postgres -0, groupId=connect-Kafka-sink-postgres] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1072)
[2024-02-10 23:13:55,794] INFO [Kafka-sink-postgres |task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:693)
[2024-02-10 23:13:55,795] INFO [Kafka-sink-postgres |task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:697)
[2024-02-10 23:13:55,796] INFO [Kafka-sink-postgres |task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:703)
[2024-02-10 23:13:55,798] INFO [Kafka-sink-postgres |task-0] App info kafka.consumer for connector-consumer-Kafka-sink-postgres -0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2024-02-10 23:22:55,081] INFO [oracle-kafka|task-0] [Producer clientId=connector-producer-oracle-kafka-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)