I’m new to using Kafka connect, I did an example running Kafka connect locally via docker to take the information from an oracle database table and make it available in a SQL Server database. At first, everything was going well, but after a run time my connector source stopped working and I can not identify why can you please help me? I will make my docker-compose file available that I am using and the configs of the source and destination connectors.
Docker-compose:
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9094:9094"
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: LISTENER_PUBLIC://kafka:9092,LISTENER_INTERNAL://localhost:9094
KAFKA_ADVERTISED_LISTENERS: LISTENER_PUBLIC://kafka:9092,LISTENER_INTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_PUBLIC:PLAINTEXT,LISTENER_INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_PUBLIC
# KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
# KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
# KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://host.docker.internal:9094
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
extra_hosts:
- "host.docker.internal:172.17.0.1"
control-center:
image: confluentinc/cp-enterprise-control-center:6.0.1
hostname: control-center
depends_on:
- kafka
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_CONNECT_CLUSTER: http://kafka-connect:8083
PORT: 9021
extra_hosts:
- "host.docker.internal:172.17.0.1"
kafka-connect:
image: confluentinc/cp-kafka-connect-base:6.0.0
container_name: kafka-connect
depends_on:
- zookeeper
- kafka
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# # Optional settings to include to support Confluent Control Center
# CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
# CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
# ---------------
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
volumes:
- $PWD/data:/data
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq-sink:latest
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:latest
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
extra_hosts:
- "host.docker.internal:172.17.0.1"
Config Source:
name=oracle-sale-source-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url = jdbc:oracle:thin:user/password@host:port:ssid
query = select * from ( select ven.is_ven "SaleId",ven.dt_ven_ret "DateImport",sku.is_sku "PresentationId",sku.nm_sku "PresentationName",prod.is_pro "ProductId",prod.nm_pro "ProductName",ind.is_for "IndustryId",ind.nm_for "IndustryTradeName",loj.is_loj "StoreId",loj.nm_loj "StoreCorporateName",rede.is_red "ChainId",rede.nm_red "ChainTradeName",pln.is_pln "BenefitId",pln.nm_pln "BenefitName",cno.is_cno "ClientId",cno.nm_cno "ClientTradeName",xcn.is_xcn "TransactionId",xcn.tp_xcn "TransactionType",ven.cd_ven_bar "CodeBar",ven.VL_VEN_FAB "ReplacementPrice",ven.QT_VEN "SoldQuantity",ven.DT_VEN "SaleDate",ven.TX_VEN_DSC_IND "ReplacementDiscount",ven.VL_VEN_DSC_IND "ReplacementDiscountValue",nsu.is_xcn_ext "ExternalNumberTransaction" from pbm_ret.t_ven ven inner join pbm_ret.t_sku sku on ven.is_sku = sku.is_sku inner join pbm_ret.t_pro prod on sku.is_pro = prod.is_pro inner join pbm_ret.t_for ind on prod.is_for = ind.is_for inner join pbm_ret.t_xcn xcn on ven.is_xcn = xcn.is_xcn inner join pbm_ret.t_loj loj on ven.is_loj = loj.is_loj inner join pbm_ret.t_red rede on loj.is_red = rede.is_red inner join pbm_ret.t_pln pln on ven.is_pln = pln.is_pln inner join pbm_ret.t_cno cno on pln.is_cno = cno.is_cno inner join pbm_ret.t_nsu nsu on xcn.is_xcn = nsu.is_xcn where is_ven >= (select max(is_ven)-10000 from t_ven) and ven.tx_ven_dsc_ind > 0) tb_Sale
mode=timestamp
timestamp.column.name=DateImport
topic.prefix=Tb_Sale
numeric.mapping=best_fit
Config Sink:
name= sql-sale-sink-connector
connector.class= io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max= 1
topics= Tb_Sale
connection.url= jdbc:sqlserver://host:port;databaseName=ProductDev
connection.user= usr_dev
connection.password= dev123
insert.mode= upsert
auto.create= true
auto.evolve= false
pk.mode= record_value
pk.fields= SaleId