SftpCSVSourceConnectors and Schema

Hello!I’m trying to setup an SftpCSVSourceConnector in my local env and I’m having some trouble setting a schema to the connector. This is what I’m trying to do

 curl -i -X PUT -H "Accept:application/json" \
 -H "Content-Type:application/json" http://localhost:8083/connectors/nc-csv-02/config \
 -d '{
	"tasks.max" : "1",
	"connector.class" : "io.confluent.connect.sftp.SftpCsvSourceConnector",
	"kafka.topic": "sftp-csv-00",
	"cleanup.policy":"NONE",
    "behavior.on.error":"IGNORE",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
	"input.path" : "/",
	"csv.separator.char" : 59,
	"finished.path" : "/finished",
	"error.path" : "/error",
	"schema.generation.key.fields" : "msisdn",
	"input.file.pattern" : ".*\\.dat",
	"schema.generation.enabled" : "false",
	"csv.first.row.as.header" : "true",
	"key.schema":"{\"fields\":[{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]}],\"name\":\"NCKeySchema\",\"type\":\"record\"}",
	"value.schema":"{\"name\":\"NCPortabilityMovementEvent\",\"type\":\"record\",\"fields\":[{\"default\":null,\"name\":\"action\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"previousNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"newNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"effectiveDate\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"referenceID\",\"type\":[\"null\",\"string\"]}]}",
	"sftp.username":"tester",
    "sftp.password":"password",
    "sftp.host":"192.168.1.2",
    "sftp.port":"22"
 }'

The exception I see in the worker task is

org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "fields" (class com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage), not marked as ignorable (10 known properties: "defaultValue", "valueSchema", "doc", "type", "name", "keySchema", "version", "parameters", "isOptional", "fieldSchemas"])
 at [Source: (String)"{"fields":[{"default":null,"name":"msisdn","type":["null","string"]}],"name":"NCKeySchema","type":"record"}"; line: 1, column: 12] (through reference chain: com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage["fields"]) for configuration Could not read schema from 'key.schema'
	at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.readSchema(SftpSourceConnectorConfig.java:334)
	at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.<init>(SftpSourceConnectorConfig.java:117)
	at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.<init>(SftpCsvSourceConnectorConfig.java:156)
	at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:44)
	at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:185)
	at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:210)
	at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
	at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:332)
	at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:141)
	at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

The schemas I’m trying to use for key and value are


{
  "fields": [
    {
      "default": null,
      "name": "msisdn",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "NCKeySchema",
  "type": "record"
}

and

{
    "name" : "NCPortabilityMovementEvent",
    "type" : "record",
    "fields" : [
        {
            "default" : null,
            "name" : "action",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "msisdn",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "previousNRN",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "newNRN",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "effectiveDate",
            "type" : [
                "null",
                "string"
            ]
        },
        {
            "default" : null,
            "name" : "referenceID",
            "type" : [
                "null",
                "string"
            ]
        }
    ]
}

What am I doing wrong here ?

I tried this with schema.generation.enabled=true and removing the key.schema and value.schema the connector worked just fine.

Something about specifying a schema is messing it up

Thanks @OneCricketeer for helping me out on this one. The schema I provided was an avro schema but the one that was actually required was the connect schema.

The connect schema as detailed by CSV Source Connector looks something like this -


{\"name\" : \"com.example.users.User\",\"type\" : \"STRUCT\",\"isOptional\" : false,\"fieldSchemas\" : {\"id\" : {\"type\" : \"INT64\",\"isOptional\" : false},\"first_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_name\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"email\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"gender\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"ip_address\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"last_login\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"account_balance\" : {\"name\" : \"org.apache.kafka.connect.data.Decimal\",\"type\" : \"BYTES\",\"version\" : 1,\"parameters\" : {\"scale\" : \"2\"},\"isOptional\" : true},\"country\" : {\"type\" : \"STRING\",\"isOptional\" : true},\"favorite_color\" : {\"type\" : \"STRING\",\"isOptional\" : true}}}
1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.