We’re trying to ingest log files and and load each line into a DB using JDBC connector. I’m using SpoolDirLineDelimitedSourceConnector for the source connector and TeradataSinkConnector for the sink connector. Also using StringConverter and JsonConverter but some reason this configuration is not working. The data doesn’t show up in the sink DB. Does anyone know why this is not working or has a few ideas?
Thx…
hey,
any obvious errors?
how does your config look like?
best,
michael
Hey Michael,
Strange thing is that there are no errors in the log file. I do see the payload in the kafka topic populated from the source files but the sink DB does not get populated. I even tried to create the the table there before hand but that didn’t help. Here are the configs:
Worker:
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the “License”); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Converter-specific settings can be passed in by prefixing the Converter’s setting with the converter we want to apply
it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#If we run multiple standalone workers on the same host machine, the following two configuration properties must be unique for each worker
#Using the same file name for two workers will cause offset data to be deleted or overwritten with different values.
offset.storage.file.filename=/tmp/connect.offsets
#A list of URIs the REST API will listen on in the format protocol://host:port,protocol2://host2:port–the protocol is either HTTP or HTTPS.
#You can specify hostname as 0.0.0.0 to bind to all interfaces or leave hostname empty to bind to the default interface
#The current default port is now 8083 - HTTP://:8083
#listeners=
Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
(connectors, converters, transformations). The list should consist of top level directories that include
any combination of:
a) directories immediately containing jars with plugins and their dependencies
b) uber-jars with plugins and their dependencies
c) directories immediately containing the package directory structure of classes of plugins and their dependencies
Note: symlinks will be followed to discover dependencies or plugins.
Examples:
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java,/home/myuser/confluent-7.3.2/share/confluent-hub-components
Source Connector:
name=LineDelimSchemaSpoolDirTD
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector
input.path=/home/myuser/kafka-connect/logdest
error.path= /home/myuser/kafka-connect/logdest/error
finished.path=/home/myuser/kafka-connect/logdest/finished
halt.on.error=false
topic=logfiledest
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
input.file.pattern=.*\.log
file.minimum.age.ms=100
Sink Connector:
name=teradata-log-sink
confluent.topic.bootstrap.servers=localhost:9092
confluent.topic.replication.factor=1
connector.class=io.confluent.connect.teradata.TeradataSinkConnector
tasks.max=1
topics=logfiledest
teradata.url=jdbc:teradata://hostname
teradata.database=sinkDBName
teradata.username=username
teradata.password=******
pk.mode=none
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
errors.tolerance=all
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.topic.name=tdlatdq
errors.deadletterqueue.context.headers.enable=true
Thx in advance!!!