Hi,
I am using FsSourceConnector Kafka connector to ingest CSV files in to a Kafka topic.
I am using confluentinc/cp-helm-charts, with custom build docker image for Kafka connect (Added FsSourceConnector connector jar).
I have mentioned the prerequisites, Kafka Connect and Kafka Connector details below.
Problem Statement:
The below Kafka connector is working and I am able to ingest CSV in to Kafka Topic as a string value. My goal to Avro serialise the CSV data and store it in topics. I am not sure what serialisation configuration is missing in my connect/connector properties.Prerequisites:
Before running Kafka Connector:I have placed the CSV file in the kafka connect pod directory. Created a schema in schema registry for the csv.
Below is the Kafka connect details,
cp-control-center:
enabled: false
cp-kafka:
enabled: true
cp-kafka-rest:
enabled: false
cp-ksql-server:
enabled: false
cp-schema-registry:
enabled: true
cp-zookeeper:
enabled: true
cp-kafka-connect:
replicaCount: 1
image: localhost:5000/kc
imageTag: v1
imagePullPolicy: Always
servicePort: 8083
configurationOverrides:
"key.converter": "io.confluent.connect.avro.AvroConverter"
"key.converter.schema.registry.url": "test-cp-schema-registry:8081"
"value.converter": "io.confluent.connect.avro.AvroConverter"
"value.converter.schema.registry.url": "test-cp-schema-registry:8081"
"key.converter.schemas.enable": "false"
"value.converter.schemas.enable": "false"
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
"use.latest.version": "true"
"auto.register.schemas": "false"
"auto.create.topics": "false"
"config.storage.replication.factor": "1"
"offset.storage.replication.factor": "1"
"status.storage.replication.factor": "1"
"plugin.path": "/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars"
heapOptions: "-Xms5g -Xmx10g"
customEnv:
KAFKA_JMX_HOSTNAME: "127.0.0.1"
kafka:
bootstrapServers: "test-cp-kafka-headless:9092"
cp-schema-registry:
url: "test-cp-schema-registry:8081"
fullnameOverride: test
Below is the Kafka connector details:
curl -X POST \
http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-d '{
"name": "sample",
"config": {
"connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
"tasks.max": "1",
"fs.uris": "/home/appuser/csv",
"topic": "sampledata",
"use.latest.version": "true",
"auto.register.schemas": "false",
"poll.interval.ms": "10000",
"auto.create.topics": "false",
"policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy",
"policy.batch_size": "0",
"policy.recursive": "true",
"policy.regexp": "^*.csv$",
"policy.resume.on.error": "false",
"key.converter.schema.registry.url": "http://test-cp-schema-registry:8081",
"key.enhanced.avro.schema.support": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://test-cp-schema-registry:8081",
"value.enhanced.avro.schema.support": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"file_reader.delimited.settings.format.quote": "\"",
"file_reader.delimited.settings.escape_unquoted": "false",
"file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader",
"file_reader.delimited.compression.type": "none",
"file_reader.delimited.settings.schema.avro": "{\"type\":\"record\",\"name\":\"sampledata\",\"namespace\":\"default\",\"fields\":[{\"name\":\"c1\",\"type\":\"string\"},{\"name\":\"c2\",\"type\":\"string\"},{\"name\":\"c3\",\"type\":\"string\"}]}",
"file_reader.delimited.settings.delimiter_detection": "false",
"file_reader.delimited.compression.concatenated": "true",
"file_reader.delimited.settings.format.comment": "#",
"file_reader.delimited.settings.format.quote_escape": "\"",
"file_reader.delimited.settings.format.delimiter": ",",
"file_reader.encryption.passphrase": "",
"file_reader.delimited.settings.max_chars_per_column": "4096",
"file_reader.delimited.settings.line_separator_detection": "false",
"file_reader.delimited.settings.format.line_separator": "\n",
"file_reader.delimited.settings.max_columns": "512",
"file_reader.encryption.type": "NONE",
"file_reader.delimited.settings.header": "true",
"file_reader.delimited.settings.ignore_leading_whitespaces": "true",
"file_reader.delimited.settings.rows_to_skip": "0",
"file_reader.batch_size": "0",
"file_reader.encryption.secret": ""
}
}'
CSV file:
c1,c2,c3
abc,def,ghi
jkl,mno,pqr
stu,wvy,xyz
x1,x2,x3
Schema in Schema Registry:
{"subject":"sampledata-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"sampledata\",\"namespace\":\"default\",\"fields\":[{\"name\":\"c1\",\"type\":\"string\"},{\"name\":\"c2\",\"type\":\"string\"},{\"name\":\"c3\",\"type\":\"string\"}]}"}
Data in Topic:
/bin/kafka-console-consumer --topic sampledata --from-beginning --bootstrap-server cef-cp-kafka-headless:9092
abcdefghi
jklmnopqr
stuwvyxyz
x1x2x3