Hello.
I put data in oracle source db.
The offset of the topic has been changed, but there is actually no data in the topic.
#source connector
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "REG_DATE",
"incrementing.column.name": "SEQ",
"errors.log.include.messages": "true",
"producer.override.enable.idempotence": "true",
"connection.password": "***",
"tasks.max": "1",
"producer.override.retries": "3",
"transforms": "renameTopic,RenameField",
"table.whitelist": "TB_SOURCE_KAFKA_TEST_6",
"mode": "timestamp+incrementing",
"producer.override.acks": "all",
"db.timezone": "Asia/Seoul",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"poll.interval.ms": "60000",
"errors.log.enable": "true",
"transforms.RenameField.renames": "SEQ:SOURCE_SEQ,REG_DATE:SOURCE_REG_DATE",
"validate.non.null": "false",
"transforms.renameTopic.replacement": "topic-06",
"transforms.renameTopic.regex": "TB_SOURCE_KAFKA_TEST_6",
"connection.user": "***",
"name": "source-06",
"transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"connection.url": "jdbc:oracle:thin:@****:1521:STG?characterEncoding=UTF-8&serverTimezone=UTC",
"producer.override.max.in.flight.requests.per.connection": "1"
}
#sink connector
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"errors.log.include.messages": "true",
"connection.password": "***",
"tasks.max": "3",
"topics": "topic-06",
"transforms": "renameTopic",
"transforms.renameTopic.replacement": "TB_SINK_KAFKA_TEST_6",
"consumer.override.isolation.level": "read_committed",
"transforms.renameTopic.regex": "topic-06",
"auto.evolve": "false",
"connection.user": "***",
"db.timezone": "Asia/Seoul",
"name": "sink-06",
"fetch.min.bytes": "1000",
"auto.create": "false",
"transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
"connection.url": "jdbc:sqlserver://***:1433;databaseName=***",
"errors.log.enable": "true",
"insert.mode": "insert"
}
#cosumer group describe error
./kafka-consumer-groups --bootstrap-server kafka-broker-03:9092 --group connect-sink-06 --describe
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1639036341058, tries=47, nextAllowedTryMs=1639036341159) timed out at 1639036341059 after 47 attempt(s)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1639036341058, tries=47, nextAllowedTryMs=1639036341159) timed out at 1639036341059 after 47 attempt(s)
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:550)
at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.map(JavaCollectionWrappers.scala:309)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:549)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:565)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:368)
at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:73)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:60)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=findCoordinator, deadlineMs=1639036341058, tries=47, nextAllowedTryMs=1639036341159) timed out at 1639036341059 after 47 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: findCoordinator
#1. source db data
#2. topic-06 offset
#3. topic-ui → topic-06 is no data
#4. sink db data
There is no error log in the log file…
Could this be the case???