Log file reader

We’re trying to ingest log files and and load each line into a DB using JDBC connector. I’m using SpoolDirLineDelimitedSourceConnector for the source connector and TeradataSinkConnector for the sink connector. Also using StringConverter and JsonConverter but some reason this configuration is not working. The data doesn’t show up in the sink DB. Does anyone know why this is not working or has a few ideas?
Thx…

hey,

any obvious errors?
how does your config look like?

best,
michael

Hey Michael,
Strange thing is that there are no errors in the log file. I do see the payload in the kafka topic populated from the source files but the sink DB does not get populated. I even tried to create the the table there before hand but that didn’t help. Here are the configs:
Worker:

Licensed to the Apache Software Foundation (ASF) under one or more

contributor license agreements. See the NOTICE file distributed with

this work for additional information regarding copyright ownership.

The ASF licenses this file to You under the Apache License, Version 2.0

(the “License”); you may not use this file except in compliance with

the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an “AS IS” BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=localhost:9092

The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will

need to configure these based on the format they want their data in when loaded from or stored into Kafka

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

Converter-specific settings can be passed in by prefixing the Converter’s setting with the converter we want to apply

it to

key.converter.schemas.enable=true

value.converter.schemas.enable=true

#If we run multiple standalone workers on the same host machine, the following two configuration properties must be unique for each worker

#Using the same file name for two workers will cause offset data to be deleted or overwritten with different values.

offset.storage.file.filename=/tmp/connect.offsets

#A list of URIs the REST API will listen on in the format protocol://host:port,protocol2://host2:port–the protocol is either HTTP or HTTPS.

#You can specify hostname as 0.0.0.0 to bind to all interfaces or leave hostname empty to bind to the default interface

#The current default port is now 8083 - HTTP://:8083

#listeners=

Flush much faster than normal, which is useful for testing/debugging

offset.flush.interval.ms=10000

Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins

(connectors, converters, transformations). The list should consist of top level directories that include

any combination of:

a) directories immediately containing jars with plugins and their dependencies

b) uber-jars with plugins and their dependencies

c) directories immediately containing the package directory structure of classes of plugins and their dependencies

Note: symlinks will be followed to discover dependencies or plugins.

Examples:

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

plugin.path=/usr/share/java,/home/myuser/confluent-7.3.2/share/confluent-hub-components

Source Connector:
name=LineDelimSchemaSpoolDirTD

tasks.max=1

connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirLineDelimitedSourceConnector

input.path=/home/myuser/kafka-connect/logdest

error.path= /home/myuser/kafka-connect/logdest/error

finished.path=/home/myuser/kafka-connect/logdest/finished

halt.on.error=false

topic=logfiledest

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=org.apache.kafka.connect.storage.StringConverter

input.file.pattern=.*\.log

file.minimum.age.ms=100

Sink Connector:
name=teradata-log-sink

confluent.topic.bootstrap.servers=localhost:9092

confluent.topic.replication.factor=1

connector.class=io.confluent.connect.teradata.TeradataSinkConnector

tasks.max=1

topics=logfiledest

teradata.url=jdbc:teradata://hostname

teradata.database=sinkDBName

teradata.username=username

teradata.password=******

pk.mode=none

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schema.registry.url=http://localhost:8081

value.converter.schema.registry.url=http://localhost:8081

errors.tolerance=all

errors.deadletterqueue.topic.replication.factor=1

errors.deadletterqueue.topic.name=tdlatdq

errors.deadletterqueue.context.headers.enable=true

Thx in advance!!!

ok I see

is Kafka connect running inside docker containers?
did you check the kafka connect rest api? should be something like

http://$connect_host:8083/$your_connector_name/status

best,
michael

Hi Michael,

This is just running in a compute instance and not in a container.
The response to the status is coming back as 404.

When I check the topic in Kafka I see the record:
{“schema”:{“type”:“string”,“optional”:false},“payload”:“1st line in my log file”"}

But in order to be consumed by a DB table I believe it should be in a STRUCT format, something like this:
{“schema”:{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:true,“field”:“msg”}],“optional”:false,“name”:“com.github.jcustenborder.kafka.connect.model.Value”},“payload”:{“msg”:“1st line in my log file”}}

If my assumption is correct how do I get the schema to transform to a STRUCT format like above?

I’m hoping someone has done something like this and can help…

Thx…

Hi @hjconfka

if you get a 404 there is something wrong with the connector (seems it’s not running or not there)

what’s the output of
http://$connect_host:8083/

best,
michael

Error 404 Not Found

HTTP ERROR 404 Not Found

URI: /
STATUS: 404
MESSAGE: Not Found
SERVLET: -

Powered by Jetty:// 9.4.48.v20220622

Does Control Center have to be running? I’m not sure if that is even installed. Doesn’t it come automatically with 7.3.2?

seems as there is something not properly running.
how did you start the components?