Kafka Connect - Avro Serialisation is not working for a CSV file (FsSourceConnector)

Hi,

I am using FsSourceConnector Kafka connector to ingest CSV files in to a Kafka topic.
I am using confluentinc/cp-helm-charts, with custom build docker image for Kafka connect (Added FsSourceConnector connector jar).
I have mentioned the prerequisites, Kafka Connect and Kafka Connector details below.

Problem Statement:

The below Kafka connector is working and I am able to ingest CSV in to Kafka Topic as a string value. My goal to Avro serialise the CSV data and store it in topics. I am not sure what serialisation configuration is missing in my connect/connector properties.

Prerequisites:

Before running Kafka Connector:

I have placed the CSV file in the kafka connect pod directory. Created a schema in schema registry for the csv.

Below is the Kafka connect details,

cp-control-center:
  enabled: false

cp-kafka:
  enabled: true

cp-kafka-rest:
  enabled: false

cp-ksql-server:
  enabled: false

cp-schema-registry:
  enabled: true

cp-zookeeper:
  enabled: true

cp-kafka-connect:
  replicaCount: 1

  image: localhost:5000/kc
  imageTag: v1
  imagePullPolicy: Always

  servicePort: 8083

  configurationOverrides:   
    "key.converter": "io.confluent.connect.avro.AvroConverter"
    "key.converter.schema.registry.url": "test-cp-schema-registry:8081"   
    "value.converter": "io.confluent.connect.avro.AvroConverter"
    "value.converter.schema.registry.url": "test-cp-schema-registry:8081"   
    "key.converter.schemas.enable": "false"
    "value.converter.schemas.enable": "false"
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"  
    "use.latest.version": "true"
    "auto.register.schemas": "false"
    "auto.create.topics": "false"   
    "config.storage.replication.factor": "1"
    "offset.storage.replication.factor": "1"
    "status.storage.replication.factor": "1"
    "plugin.path": "/usr/share/java,/usr/share/confluent-hub-components,/etc/kafka-connect/jars"


  heapOptions: "-Xms5g -Xmx10g"

  customEnv:
    KAFKA_JMX_HOSTNAME: "127.0.0.1"
    
  kafka:
    bootstrapServers: "test-cp-kafka-headless:9092"

  cp-schema-registry:
    url: "test-cp-schema-registry:8081"
    

  fullnameOverride: test

Below is the Kafka connector details:

curl -X POST \
    http://localhost:8083/connectors \
    -H 'Content-Type:application/json' \
    -d '{
        "name": "sample",
        "config": {
        "connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",           
        "tasks.max": "1",
        "fs.uris": "/home/appuser/csv",
        "topic": "sampledata",
        "use.latest.version": "true",
        "auto.register.schemas": "false",
        "poll.interval.ms": "10000",
        "auto.create.topics": "false",
        "policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy",
        "policy.batch_size": "0",
        "policy.recursive": "true",  
        "policy.regexp": "^*.csv$",   
        "policy.resume.on.error": "false",   
        "key.converter.schema.registry.url": "http://test-cp-schema-registry:8081",
        "key.enhanced.avro.schema.support": "true",
        "key.converter": "io.confluent.connect.avro.AvroConverter",   
        "value.converter.schema.registry.url": "http://test-cp-schema-registry:8081",
        "value.enhanced.avro.schema.support": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",       
        "file_reader.delimited.settings.format.quote": "\"",
        "file_reader.delimited.settings.escape_unquoted": "false",  
        "file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.CsvFileReader",
        "file_reader.delimited.compression.type": "none",
        "file_reader.delimited.settings.schema.avro": "{\"type\":\"record\",\"name\":\"sampledata\",\"namespace\":\"default\",\"fields\":[{\"name\":\"c1\",\"type\":\"string\"},{\"name\":\"c2\",\"type\":\"string\"},{\"name\":\"c3\",\"type\":\"string\"}]}",
        "file_reader.delimited.settings.delimiter_detection": "false",
        "file_reader.delimited.compression.concatenated": "true",
        "file_reader.delimited.settings.format.comment": "#",
        "file_reader.delimited.settings.format.quote_escape": "\"",
        "file_reader.delimited.settings.format.delimiter": ",",
        "file_reader.encryption.passphrase": "",
        "file_reader.delimited.settings.max_chars_per_column": "4096",
        "file_reader.delimited.settings.line_separator_detection": "false", 
        "file_reader.delimited.settings.format.line_separator": "\n",
        "file_reader.delimited.settings.max_columns": "512",
        "file_reader.encryption.type": "NONE",
        "file_reader.delimited.settings.header": "true",
        "file_reader.delimited.settings.ignore_leading_whitespaces": "true",   
        "file_reader.delimited.settings.rows_to_skip": "0",   
        "file_reader.batch_size": "0",
        "file_reader.encryption.secret": ""
    }
}'

CSV file:

c1,c2,c3
abc,def,ghi
jkl,mno,pqr
stu,wvy,xyz
x1,x2,x3

Schema in Schema Registry:


{"subject":"sampledata-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"sampledata\",\"namespace\":\"default\",\"fields\":[{\"name\":\"c1\",\"type\":\"string\"},{\"name\":\"c2\",\"type\":\"string\"},{\"name\":\"c3\",\"type\":\"string\"}]}"}

Data in Topic:

/bin/kafka-console-consumer --topic sampledata --from-beginning --bootstrap-server cef-cp-kafka-headless:9092
abcdefghi
jklmnopqr
stuwvyxyz
x1x2x3

kafka-console-consumer reads string data only.

Given that you’ve set "value.converter": "io.confluent.connect.avro.AvroConverter", perhaps you should be using kafka-avro-console-consumer