Kafka Cassandra sink Invalid CQL form needs double quotes

Hi

I am using a confluent Cassandra sink connector to write Kafka data into Cassandra. I’m running Cassandra in a docker container.

Here is my connector property

{
    "connector.class":"io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max":30,
    "topics.regex":"Test.kafka_TEST.*",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "cassandra.contact.points":"<cassandra_ip>",
    "cassandra.local.datacenter":"datacenter1",
    "cassandra.security":"PASSWORD",
    "cassandra.username":"cassandra",
    "cassandra.password":"cassandra_password",
    "cassandra.keyspace":"test",
    "cassandra.keyspace.create.enabled":"false",
    "confluent.topic.bootstrap.servers":"<bootstrap_server>:9092",
    "confluent.topic.replication.factor":1,
    "cassandra.write.mode":"Insert",
    "cassandra.table.manage.enabled": "true",
    "flush.size": "1"
}

but worker tasks failed with following Error

{
      "id": 29,
      "state": "FAILED",
      "worker_id": "kafka-connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.IllegalArgumentException: Invalid CQL form [Test.kafka_TEST.Procedure]: needs double quotes\n\tat com.datastax.oss.driver.shaded.guava.common.base.Preconditions.checkArgument(Preconditions.java:216)\n\tat com.datastax.oss.driver.api.core.CqlIdentifier.fromCql(CqlIdentifier.java:79)\n\tat com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata.getTable(KeyspaceMetadata.java:56)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.tableMetadata(CassandraSessionImpl.java:83)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.createInsertPreparedStatement(CassandraSessionImpl.java:154)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.access$300(CassandraSessionImpl.java:42)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl$2.apply(CassandraSessionImpl.java:192)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl$2.apply(CassandraSessionImpl.java:189)\n\tat java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1133)\n\tat io.confluent.connect.cassandra.CassandraSessionImpl.insert(CassandraSessionImpl.java:187)\n\tat io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:119)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\t... 10 more\n"
    }

Error

So, what causes this error? is my connector configuration missing anything?
Another question: Do we need to pre-create a table in Cassandra before starting the connector?

Edit: Sample message of topic Test.kafka_TEST.Procedure

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "Name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "City"
      },
      {
        "type": "string",
        "optional": true,
        "field": "Email"
      },
      {
        "type": "string",
        "optional": true,
        "name": "oryanmoshe.time.DateTimeString",
        "field": "Procedure_start_Date"
      },
      {
        "type": "string",
        "optional": true,
        "name": "oryanmoshe.time.DateTimeString",
        "field": "Procedure_end_Date"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__table"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "Test.kafka_TEST.Procedure.Value"
  },
  "payload": {
    "id": 96,
    "Name": "Meredith",
    "City": "Iqaluit",
    "Email": "erat.vitae@porttitortellus.org",
    "Procedure_start_Date": "2020-07-08 20:57:37",
    "Procedure_end_Date": "2020-05-07 23:28:48",
    "__table": "Procedure",
    "__deleted": "false"
  }
}

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "Name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "City"
      },
      {
        "type": "string",
        "optional": true,
        "field": "Email"
      },
      {
        "type": "string",
        "optional": true,
        "name": "oryanmoshe.time.DateTimeString",
        "field": "Procedure_start_Date"
      },
      {
        "type": "string",
        "optional": true,
        "name": "oryanmoshe.time.DateTimeString",
        "field": "Procedure_end_Date"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__table"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "Test.kafka_TEST.Procedure.Value"
  },
  "payload": {
    "id": 97,
    "Name": "Abbot",
    "City": "Belgrave",
    "Email": "tincidunt@dolordolor.ca",
    "Procedure_start_Date": "2020-12-03 23:42:27",
    "Procedure_end_Date": "2021-11-26 20:23:33",
    "__table": "Procedure",
    "__deleted": "false"
  }
}

Hi @achyut Have you solved this issue because I’m getting the same issue?

I think you should try using RegexRouter to remove or replace the periods in the topic name.

Identifiers cannot have periods. Definitions | Apache Cassandra Documentation