Csv file with illegal character for avro / rename fields SMT

Hi,
I try to load a bunch of csv files into kafka with the SpoolDirCsvSourceConnector using schema registry + avro.
Unfortunately the header names are something like “First-Name” etc. so this indeed clash with schema registry and avro.
I could indeed replace headers before with sed or something but actually I want to avoid that.
The customer should upload its file to a FTP server where the connector is listing and the data should end up in a SQL database. I also add some static fields.

My thoughts but I am not sure if they working, especially with using schema registry and avro:

  1. Can I actually use SMT to change field name in source connector to change that header namers before the schema is actually created? I also would like to castType fields already in source connector to auto create a suitable schema.

  2. If not, I could skip the first row so the columns would have names like “column01” and I use SMT rename field on jdbc sync connector like column01:firstname etc.

Does anyone has an idea?

Example

{
      "name": "csv-test-01",
      "config": {
              "connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
              "key.converter":"io.confluent.connect.avro.AvroConverter",
              "key.converter.schema.registry.url":"http://<schema-registry>:8081",
              "value.converter":"io.confluent.connect.avro.AvroConverter",
              "value.converter.schema.registry.url":"http://<schema-registry>:8081",
	          "topic":"csv-customer_data",
              "input.path":"/data/customer",
              "finished.path":"/data/processed",
              "error.path":"/data/error",
              "input.file.pattern": "test.csv",
              "schema.generation.enabled":"true",
              "schema.generation.key.fields":"client_id",
              "csv.first.row.as.header":"false",
              "csv.skip.lines":1,
	          "cleanup.policy":"MOVE",
	          "csv.ignore.quotations":"true",
              "csv.separator.char":"59",
              "halt.on.error":"false",
              "empty.poll.wait.ms":"3600000",
	          "transforms":"insertTenant_id,insertCustomer_id,insertCustomer,ReplaceField,castTypes",
              "transforms.insertTenant_id.type":"org.apache.kafka.connect.transforms.InsertField$Value",
              "transforms.insertTenant_id.static.field":"tenant_id",
              "transforms.insertTenant_id.static.value":"0815",             
"transforms.insertCustomer_id.type":"org.apache.kafka.connect.transforms.InsertField$Value",
              "transforms.insertCustomer_id.static.field":"client_id",
              "transforms.insertCustomer_id.static.value":"1193638",
              "transforms.insertCustomer.type":"org.apache.kafka.connect.transforms.InsertField$Value",
              "transforms.insertCustomer.static.field":"client_name",
              "transforms.insertCustomer.static.value":"customername",
              "transforms.ReplaceField.rename": "Nummer:customer_id,Verkaeufer-Nachname:sales_person_last_name,Verkaeufer-Vorname:sales_person_first_name",
              "transforms.castTypes.spec":"customer_id:int32,sales_person_last_name:string,sales_person_first_name:string,
            }
      }

skip the first row so the columns would have names like “column01” and I use SMT rename field

I’d look at this route, but apply the SMT in the source connector here so that the data that goes to Kafka is as complete as possible.

I’d also check Kafka Connect File Pulse to see how it handles this kind of scenario.

Thank you Robin,
I’ll play around with option 2 then. So rename fields should also work for source connectors right? I’ve seen only sink use cases so far.

And thanks for the file-pulse connector. This thing seems even more powerful. But its promising and actually I see already another feature I could need regarding Azure blob storage.

I’ll give feedback how that turned out.

So rename fields should also work for source connectors right? I’ve seen only sink use cases so far.

In general, Single Message Transform can be used in source or sink. There are some usecases where it would only make sense for one – but conceptually they are interchangeable.

1 Like

@rmoff sorry for the late feedback.
file pulse connector with renamed headers works fine so far.

But I do have an Issue with the ConvertFilter. Does anyone has used it before? I think this should work if I understood the documentation correct:

I’ll try to use it to convert a price to a float like following:

           "filters"                            : "ParseLine,Converter1",
             ...
            "filters.Converter1.type"        : "io.streamthoughts.kafka.connect.filepulse.filter.ConvertFilter",
            "filters.Converter1.field"       : "purchase_price",
            "filters.Converter1.to"          : "FLOAT",

I’ve not tried the convert Single Message Transform - perhaps @fhussonnois can help?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.