[bug] [kafka-connect-gcp-bigtable 2.0.26] No support for logical data type `org.apache.kafka.connect.data.Date` in message value

Hello,
I think I encountered a bug when evaluating Google Cloud BigTable Sink Connector in a docker compose setup based on cp-all-in-one/cp-all-in-one-community/docker-compose.yml at dd2eb847f183e65b6a1cca4045649ba0d48cf51d · confluentinc/cp-all-in-one · GitHub.
The sink connector throws an exception when encountering a logical Date in value:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:624)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class java.util.Date cannot be cast to class java.lang.Integer (java.util.Date and java.lang.Integer are in module java.base of loader 'bootstrap')
        at io.confluent.connect.bigtable.client.BufferedWriter.getColumnValueFromField(BufferedWriter.java:271)
        at io.confluent.connect.bigtable.client.BufferedWriter.addStructRecordWriteToBatch(BufferedWriter.java:204)
        at io.confluent.connect.bigtable.client.BufferedWriter.addWriteToBatch(BufferedWriter.java:86)
        at io.confluent.connect.bigtable.client.UpsertWriter.lambda$flush$0(UpsertWriter.java:70)
        at java.base/java.util.HashMap.forEach(HashMap.java:1337)
        at io.confluent.connect.bigtable.client.UpsertWriter.flush(UpsertWriter.java:68)
        at io.confluent.connect.bigtable.BaseBigtableSinkTask.lambda$put$2(BaseBigtableSinkTask.java:104)
        at java.base/java.util.HashMap.forEach(HashMap.java:1337)
        at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)
        ... 11 more

The problem happens with all: io.confluent.connect.json.JsonSchemaConverter, io.confluent.connect.avro.AvroConverter, io.confluent.connect.protobuf.ProtobufConverter configured to use schema registry from the same docker compose file. The stack trace above is identical for all of them.

Sink connector config:

{
  "name": "confluent_postgres",
  "config": {
    "connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
    "row.key.definition": "",
    "table.name.format": "postgres_confluent_table",
    "confluent.topic.bootstrap.servers": "kafka:29092",
    "gcp.bigtable.credentials.path": "/gcp_key.json",
    "tasks.max": "1",
    "topics": "postgres_logical",
    "gcp.bigtable.project.id": "unoperate-test",
    "confluent.license": "",
    "row.key.delimiter": "#",
    "confluent.topic.replication.factor": "1",
    "name": "confluent_postgres",
    "gcp.bigtable.instance.id": "prawilny-dataflow",
    "auto.create.tables": "true",
    "auto.create.column.families": "true",
    "insert.mode": "upsert"
  },
  "tasks": [
    {
      "connector": "confluent_postgres",
      "task": 0
    }
  ],
  "type": "sink"
}

Source connector config (confluentinc-kafka-connect-jdbc-10.8.0):

{
  "name": "postgres",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "incrementing.column.name": "id",
    "errors.log.include.messages": "true",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "connection.password": "password",
    "tasks.max": "1",
    "transforms": "createKey,extractInt",
    "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "batch.max.rows": "1000",
    "table.whitelist": "logical",
    "mode": "incrementing",
    "topic.prefix": "postgres_",
    "transforms.extractInt.field": "id",
    "connection.user": "user",
    "transforms.createKey.fields": "id",
    "poll.interval.ms": "1000",
    "name": "postgres",
    "errors.tolerance": "all",
    "connection.url": "jdbc:postgresql://postgres:5432/db",
    "errors.log.enable": "true"
  },
  "tasks": [
    {
      "connector": "postgres",
      "task": 0
    }
  ],
  "type": "source"
}

Database content (select * from logical;):

id | logical_date 
----+--------------
  1 | 2025-01-17

Database schema:

CREATE TABLE logical (
    id serial PRIMARY KEY,
    logical_date date
)

Schema in schema registry (curl http://localhost:8081/schemas/ids/1 | jq .schema -r | jq):

{
  "type": "object",
  "title": "logical",
  "properties": {
    "id": {
      "type": "integer",
      "connect.index": 0,
      "connect.type": "int32"
    },
    "logical_date": {
      "connect.index": 1,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "integer",
          "title": "org.apache.kafka.connect.data.Date",
          "connect.version": 1,
          "connect.type": "int32"
        }
      ]
    }
  }
}

From my looking at the Kafka Connect source code, it looks as if the connector didn’t take org.apache.kafka.connect.data.Schema#name() into account and only used org.apache.kafka.connect.data.Schema#type().

I can provide a reproducer in form of docker-compose.yml and related scripts if needed.

PS. I wanted to try Slack first, but the invitation bot seems broken: Confluent Community - see “Important:This page currently cannot accept signups. If you are the creator of this page, please visit your LaunchPass Dashboard” at the bottom of the page.

This topic is temporarily closed for at least 1000000 hours due to a large number of community flags.