Hi,
I try to load a bunch of csv files into kafka with the SpoolDirCsvSourceConnector using schema registry + avro.
Unfortunately the header names are something like “First-Name” etc. so this indeed clash with schema registry and avro.
I could indeed replace headers before with sed or something but actually I want to avoid that.
The customer should upload its file to a FTP server where the connector is listing and the data should end up in a SQL database. I also add some static fields.
My thoughts but I am not sure if they working, especially with using schema registry and avro:
-
Can I actually use SMT to change field name in source connector to change that header namers before the schema is actually created? I also would like to castType fields already in source connector to auto create a suitable schema.
-
If not, I could skip the first row so the columns would have names like “column01” and I use SMT rename field on jdbc sync connector like
column01:firstname
etc.
Does anyone has an idea?
Example
{
"name": "csv-test-01",
"config": {
"connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://<schema-registry>:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://<schema-registry>:8081",
"topic":"csv-customer_data",
"input.path":"/data/customer",
"finished.path":"/data/processed",
"error.path":"/data/error",
"input.file.pattern": "test.csv",
"schema.generation.enabled":"true",
"schema.generation.key.fields":"client_id",
"csv.first.row.as.header":"false",
"csv.skip.lines":1,
"cleanup.policy":"MOVE",
"csv.ignore.quotations":"true",
"csv.separator.char":"59",
"halt.on.error":"false",
"empty.poll.wait.ms":"3600000",
"transforms":"insertTenant_id,insertCustomer_id,insertCustomer,ReplaceField,castTypes",
"transforms.insertTenant_id.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTenant_id.static.field":"tenant_id",
"transforms.insertTenant_id.static.value":"0815",
"transforms.insertCustomer_id.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertCustomer_id.static.field":"client_id",
"transforms.insertCustomer_id.static.value":"1193638",
"transforms.insertCustomer.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertCustomer.static.field":"client_name",
"transforms.insertCustomer.static.value":"customername",
"transforms.ReplaceField.rename": "Nummer:customer_id,Verkaeufer-Nachname:sales_person_last_name,Verkaeufer-Vorname:sales_person_first_name",
"transforms.castTypes.spec":"customer_id:int32,sales_person_last_name:string,sales_person_first_name:string,
}
}