Hello,
I am configuring JDBC Connector for the first time to synchronize between two Oracle 11g databases.
It is a connector that in theory is simple, reading new lines from the database and writing to the other database.
Below are the settings I have provided for the Source Connector.
name = oracle-db-source
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max = 1
connection.url = jdbc:oracle:thin:@oracle-db-source:1521:XE
connection.user = SC
connection.password = oracle
dialect.name = OracleDatabaseDialect
mode = incrementing
incrementing.column.name = ID
validate.non.null = false
query = SELECT ID, SC_ID, CREATE_DATE, UPDATE_DATE FROM SC.EXAMPLE
However, when starting the connector and checking the Kafka Connect logs, I get the error message below.
2023-09-06 14:34:01 [2023-09-06 13:34:01,401] INFO [oracle-db-source|task-0] Validating JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:171)
2023-09-06 14:34:01 [2023-09-06 13:34:01,401] INFO [oracle-db-source|task-0] Validated JDBC URL. (io.confluent.connect.jdbc.dialect.DatabaseDialects:174)
2023-09-06 14:34:01 [2023-09-06 13:34:01,402] INFO [oracle-db-source|task-0] Using JDBC dialect Oracle (io.confluent.connect.jdbc.source.JdbcSourceTask:138)
2023-09-06 14:34:01 [2023-09-06 13:34:01,405] INFO [oracle-db-source|task-0] [Producer clientId=connector-producer-oracle-db-source-0] Cluster ID: uCie5uMhTf6uDKvGbfid8Q (org.apache.kafka.clients.Metadata:279)
2023-09-06 14:34:01 [2023-09-06 13:34:01,819] INFO [oracle-db-source|task-0] Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:307)
2023-09-06 14:34:01 [2023-09-06 13:34:01,820] INFO [oracle-db-source|task-0] WorkerSourceTask{id=oracle-db-source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
2023-09-06 14:34:01 [2023-09-06 13:34:01,919] INFO [oracle-db-source|task-0] Begin using SQL query: SELECT ID, SC_ID, CREATE_DATE, UPDATE_DATE FROM SC.EXAMPLE WHERE "ID" > ? ORDER BY "ID" ASC (io.confluent.connect.jdbc.source.TableQuerier:182)
2023-09-06 14:34:01 [2023-09-06 13:34:01,947] WARN [oracle-db-source|task-0] [Producer clientId=connector-producer-oracle-db-source-0] Error while fetching metadata with correlation id 3 : {=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient:1073)
2023-09-06 14:34:01 [2023-09-06 13:34:01,948] ERROR [oracle-db-source|task-0] [Producer clientId=connector-producer-oracle-db-source-0] Metadata response reported invalid topics [] (org.apache.kafka.clients.Metadata:294)
2023-09-06 14:34:01 [2023-09-06 13:34:01,948] ERROR [oracle-db-source|task-0] WorkerSourceTask{id=oracle-db-source-0} failed to send record to : (org.apache.kafka.connect.runtime.WorkerSourceTask:370)
2023-09-06 14:34:01 org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: []
2023-09-06 14:34:01 [2023-09-06 13:34:01,951] INFO [oracle-db-source|task-0] WorkerSourceTask{id=oracle-db-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:478)
2023-09-06 14:34:01 [2023-09-06 13:34:01,953] INFO [oracle-db-source|task-0] WorkerSourceTask{id=oracle-db-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:495)
2023-09-06 14:34:06 [2023-09-06 13:34:06,953] ERROR [oracle-db-source|task-0] WorkerSourceTask{id=oracle-db-source-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:500)
2023-09-06 14:34:06 [2023-09-06 13:34:06,953] ERROR [oracle-db-source|task-0] WorkerSourceTask{id=oracle-db-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
2023-09-06 14:34:06 org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
2023-09-06 14:34:06 at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:282)
2023-09-06 14:34:06 at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:336)
2023-09-06 14:34:06 at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
2023-09-06 14:34:06 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
2023-09-06 14:34:06 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
2023-09-06 14:34:06 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-09-06 14:34:06 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-09-06 14:34:06 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-09-06 14:34:06 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-09-06 14:34:06 at java.base/java.lang.Thread.run(Thread.java:834)
2023-09-06 14:34:06 Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: []
Docker Compose File
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://host.docker.internal:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
extra_hosts:
- "host.docker.internal:172.17.0.1"
control-center:
image: confluentinc/cp-enterprise-control-center:6.0.1
hostname: control-center
container_name: control-center
depends_on:
- kafka
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_CONNECT_CLUSTER: http://kafka-connect:8083
PORT: 9021
extra_hosts:
- "host.docker.internal:172.17.0.1"
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.0.0
container_name: kafka-connect
depends_on:
- zookeeper
- kafka
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# # Optional settings to include to support Confluent Control Center
# CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
# CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- ./kafka-connect/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:10.0.1
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-oracle-cdc:latest
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
extra_hosts:
- "host.docker.internal:172.17.0.1"
oracle-db-source:
container_name: oracle-db-source
image: oracleinanutshell/oracle-xe-11g:latest
ports:
- 1521:1521
- 5500:5500
volumes:
- ./oracle-db-source/oradata:/opt/oracle/oradata
- ./oracle-db-source/backup:/opt/oracle/backup
environment:
- ORACLE_ALLOW_REMOTE=YES
- ORACLE_HOME=/u01/app/oracle/product/11.2.0/xe
extra_hosts:
- "host.docker.internal:172.17.0.1"
oracle-db-target:
container_name: oracle-db-target
image: oracleinanutshell/oracle-xe-11g:latest
ports:
- 3042:1521
- 3300:5500
volumes:
- ./oracle-db-target/oradata:/opt/oracle/oradata
- ./oracle-db-target/backup:/opt/oracle/backup
environment:
- ORACLE_ALLOW_REMOTE=YES
- ORACLE_HOME=/u01/app/oracle/product/11.2.0/xe
extra_hosts:
- "host.docker.internal:172.17.0.1"