Topic offset changed but no data in topic

Hello.

I put data in oracle source db.
The offset of the topic has been changed, but there is actually no data in the topic.

#source connector

 {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "timestamp.column.name": "REG_DATE",
                "incrementing.column.name": "SEQ",
                "errors.log.include.messages": "true",
                "producer.override.enable.idempotence": "true",
                "connection.password": "***",
                "tasks.max": "1",
                "producer.override.retries": "3",
                "transforms": "renameTopic,RenameField",
                "table.whitelist": "TB_SOURCE_KAFKA_TEST_6",
                "mode": "timestamp+incrementing",
                "producer.override.acks": "all",
                "db.timezone": "Asia/Seoul",
                "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
                "poll.interval.ms": "60000",
                "errors.log.enable": "true",
                "transforms.RenameField.renames": "SEQ:SOURCE_SEQ,REG_DATE:SOURCE_REG_DATE",
                "validate.non.null": "false",
                "transforms.renameTopic.replacement": "topic-06",
                "transforms.renameTopic.regex": "TB_SOURCE_KAFKA_TEST_6",
                "connection.user": "***",
                "name": "source-06",
                "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
                "connection.url": "jdbc:oracle:thin:@****:1521:STG?characterEncoding=UTF-8&serverTimezone=UTC",
                "producer.override.max.in.flight.requests.per.connection": "1"
            }

#sink connector

{
                "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
                "errors.log.include.messages": "true",
                "connection.password": "***",
                "tasks.max": "3",
                "topics": "topic-06",
                "transforms": "renameTopic",
                "transforms.renameTopic.replacement": "TB_SINK_KAFKA_TEST_6",
                "consumer.override.isolation.level": "read_committed",
                "transforms.renameTopic.regex": "topic-06",
                "auto.evolve": "false",
                "connection.user": "***",
                "db.timezone": "Asia/Seoul",
                "name": "sink-06",
                "fetch.min.bytes": "1000",
                "auto.create": "false",
                "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
                "connection.url": "jdbc:sqlserver://***:1433;databaseName=***",
                "errors.log.enable": "true",
                "insert.mode": "insert"
}

#cosumer group describe error

./kafka-consumer-groups --bootstrap-server kafka-broker-03:9092 --group connect-sink-06 --describe

Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1639036341058, tries=47, nextAllowedTryMs=1639036341159) timed out at 1639036341059 after 47 attempt(s)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1639036341058, tries=47, nextAllowedTryMs=1639036341159) timed out at 1639036341059 after 47 attempt(s)
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:550)
	at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
	at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
	at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.map(JavaCollectionWrappers.scala:309)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:549)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:565)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:368)
	at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:73)
	at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:60)
	at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1639036341058, tries=47, nextAllowedTryMs=1639036341159) timed out at 1639036341059 after 47 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: findCoordinator

#1. source db data
image

#2. topic-06 offset
image

#3. topic-ui → topic-06 is no data

#4. sink db data
image

There is no error log in the log file…
Could this be the case???

Not sure if this could be the issue but is the JDBC Source Connect able to handle idempodence properley? (https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-7077?filter=allopenissues is still OPEN - as well as KIP-618: Exactly-Once Support for Source Connectors - Apache Kafka - Apache Software Foundation )

Can you try without this?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.