Hi
I am using a confluent Cassandra sink connector to write Kafka data into Cassandra. I’m running Cassandra in a docker container.
Here is my connector property
{
"connector.class":"io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max":30,
"topics.regex":"Test.kafka_TEST.*",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"cassandra.contact.points":"<cassandra_ip>",
"cassandra.local.datacenter":"datacenter1",
"cassandra.security":"PASSWORD",
"cassandra.username":"cassandra",
"cassandra.password":"cassandra_password",
"cassandra.keyspace":"test",
"cassandra.keyspace.create.enabled":"false",
"confluent.topic.bootstrap.servers":"<bootstrap_server>:9092",
"confluent.topic.replication.factor":1,
"cassandra.write.mode":"Insert",
"cassandra.table.manage.enabled": "true",
"flush.size": "1"
}
but worker tasks failed with following Error
{
"id": 29,
"state": "FAILED",
"worker_id": "kafka-connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.IllegalArgumentException: Invalid CQL form [Test.kafka_TEST.Procedure]: needs double quotes\n\tat com.datastax.oss.driver.shaded.guava.common.base.Preconditions.checkArgument(Preconditions.java:216)\n\tat com.datastax.oss.driver.api.core.CqlIdentifier.fromCql(CqlIdentifier.java:79)\n\tat com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata.getTable(KeyspaceMetadata.java:56)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.tableMetadata(CassandraSessionImpl.java:83)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.createInsertPreparedStatement(CassandraSessionImpl.java:154)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.access$300(CassandraSessionImpl.java:42)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl$2.apply(CassandraSessionImpl.java:192)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl$2.apply(CassandraSessionImpl.java:189)\n\tat java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1133)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.insert(CassandraSessionImpl.java:187)\n\tat io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:119)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\t... 10 more\n"
}
Error
So, what causes this error? is my connector configuration missing anything?
Another question: Do we need to pre-create a table in Cassandra before starting the connector?
Edit: Sample message of topic Test.kafka_TEST.Procedure
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "Name"
},
{
"type": "string",
"optional": true,
"field": "City"
},
{
"type": "string",
"optional": true,
"field": "Email"
},
{
"type": "string",
"optional": true,
"name": "oryanmoshe.time.DateTimeString",
"field": "Procedure_start_Date"
},
{
"type": "string",
"optional": true,
"name": "oryanmoshe.time.DateTimeString",
"field": "Procedure_end_Date"
},
{
"type": "string",
"optional": true,
"field": "__table"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
}
],
"optional": false,
"name": "Test.kafka_TEST.Procedure.Value"
},
"payload": {
"id": 96,
"Name": "Meredith",
"City": "Iqaluit",
"Email": "erat.vitae@porttitortellus.org",
"Procedure_start_Date": "2020-07-08 20:57:37",
"Procedure_end_Date": "2020-05-07 23:28:48",
"__table": "Procedure",
"__deleted": "false"
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "Name"
},
{
"type": "string",
"optional": true,
"field": "City"
},
{
"type": "string",
"optional": true,
"field": "Email"
},
{
"type": "string",
"optional": true,
"name": "oryanmoshe.time.DateTimeString",
"field": "Procedure_start_Date"
},
{
"type": "string",
"optional": true,
"name": "oryanmoshe.time.DateTimeString",
"field": "Procedure_end_Date"
},
{
"type": "string",
"optional": true,
"field": "__table"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
}
],
"optional": false,
"name": "Test.kafka_TEST.Procedure.Value"
},
"payload": {
"id": 97,
"Name": "Abbot",
"City": "Belgrave",
"Email": "tincidunt@dolordolor.ca",
"Procedure_start_Date": "2020-12-03 23:42:27",
"Procedure_end_Date": "2021-11-26 20:23:33",
"__table": "Procedure",
"__deleted": "false"
}
}