Hello!I’m trying to setup an SftpCSVSourceConnector in my local env and I’m having some trouble setting a schema to the connector. This is what I’m trying to do
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/nc-csv-02/config \
-d '{
"tasks.max" : "1",
"connector.class" : "io.confluent.connect.sftp.SftpCsvSourceConnector",
"kafka.topic": "sftp-csv-00",
"cleanup.policy":"NONE",
"behavior.on.error":"IGNORE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"input.path" : "/",
"csv.separator.char" : 59,
"finished.path" : "/finished",
"error.path" : "/error",
"schema.generation.key.fields" : "msisdn",
"input.file.pattern" : ".*\\.dat",
"schema.generation.enabled" : "false",
"csv.first.row.as.header" : "true",
"key.schema":"{\"fields\":[{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]}],\"name\":\"NCKeySchema\",\"type\":\"record\"}",
"value.schema":"{\"name\":\"NCPortabilityMovementEvent\",\"type\":\"record\",\"fields\":[{\"default\":null,\"name\":\"action\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"msisdn\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"previousNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"newNRN\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"effectiveDate\",\"type\":[\"null\",\"string\"]},{\"default\":null,\"name\":\"referenceID\",\"type\":[\"null\",\"string\"]}]}",
"sftp.username":"tester",
"sftp.password":"password",
"sftp.host":"192.168.1.2",
"sftp.port":"22"
}'
The exception I see in the worker task is
org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "fields" (class com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage), not marked as ignorable (10 known properties: "defaultValue", "valueSchema", "doc", "type", "name", "keySchema", "version", "parameters", "isOptional", "fieldSchemas"])
at [Source: (String)"{"fields":[{"default":null,"name":"msisdn","type":["null","string"]}],"name":"NCKeySchema","type":"record"}"; line: 1, column: 12] (through reference chain: com.github.jcustenborder.kafka.connect.utils.jackson.SchemaSerializationModule$Storage["fields"]) for configuration Could not read schema from 'key.schema'
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.readSchema(SftpSourceConnectorConfig.java:334)
at io.confluent.connect.sftp.source.SftpSourceConnectorConfig.<init>(SftpSourceConnectorConfig.java:117)
at io.confluent.connect.sftp.source.SftpCsvSourceConnectorConfig.<init>(SftpCsvSourceConnectorConfig.java:156)
at io.confluent.connect.sftp.SftpCsvSourceConnector.start(SftpCsvSourceConnector.java:44)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:185)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:210)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:332)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:141)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:118)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
The schemas I’m trying to use for key and value are
{
"fields": [
{
"default": null,
"name": "msisdn",
"type": [
"null",
"string"
]
}
],
"name": "NCKeySchema",
"type": "record"
}
and
{
"name" : "NCPortabilityMovementEvent",
"type" : "record",
"fields" : [
{
"default" : null,
"name" : "action",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "msisdn",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "previousNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "newNRN",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "effectiveDate",
"type" : [
"null",
"string"
]
},
{
"default" : null,
"name" : "referenceID",
"type" : [
"null",
"string"
]
}
]
}
What am I doing wrong here ?
I tried this with schema.generation.enabled=true
and removing the key.schema
and value.schema
the connector worked just fine.
Something about specifying a schema is messing it up