Hello,
I think I encountered a bug when evaluating Google Cloud BigTable Sink Connector in a docker compose setup based on cp-all-in-one/cp-all-in-one-community/docker-compose.yml at dd2eb847f183e65b6a1cca4045649ba0d48cf51d · confluentinc/cp-all-in-one · GitHub.
The sink connector throws an exception when encountering a logical Date in value:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:624)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class java.util.Date cannot be cast to class java.lang.Integer (java.util.Date and java.lang.Integer are in module java.base of loader 'bootstrap')
at io.confluent.connect.bigtable.client.BufferedWriter.getColumnValueFromField(BufferedWriter.java:271)
at io.confluent.connect.bigtable.client.BufferedWriter.addStructRecordWriteToBatch(BufferedWriter.java:204)
at io.confluent.connect.bigtable.client.BufferedWriter.addWriteToBatch(BufferedWriter.java:86)
at io.confluent.connect.bigtable.client.UpsertWriter.lambda$flush$0(UpsertWriter.java:70)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.confluent.connect.bigtable.client.UpsertWriter.flush(UpsertWriter.java:68)
at io.confluent.connect.bigtable.BaseBigtableSinkTask.lambda$put$2(BaseBigtableSinkTask.java:104)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)
... 11 more
The problem happens with all: io.confluent.connect.json.JsonSchemaConverter
, io.confluent.connect.avro.AvroConverter
, io.confluent.connect.protobuf.ProtobufConverter
configured to use schema registry from the same docker compose file. The stack trace above is identical for all of them.
Sink connector config:
{
"name": "confluent_postgres",
"config": {
"connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
"row.key.definition": "",
"table.name.format": "postgres_confluent_table",
"confluent.topic.bootstrap.servers": "kafka:29092",
"gcp.bigtable.credentials.path": "/gcp_key.json",
"tasks.max": "1",
"topics": "postgres_logical",
"gcp.bigtable.project.id": "unoperate-test",
"confluent.license": "",
"row.key.delimiter": "#",
"confluent.topic.replication.factor": "1",
"name": "confluent_postgres",
"gcp.bigtable.instance.id": "prawilny-dataflow",
"auto.create.tables": "true",
"auto.create.column.families": "true",
"insert.mode": "upsert"
},
"tasks": [
{
"connector": "confluent_postgres",
"task": 0
}
],
"type": "sink"
}
Source connector config (confluentinc-kafka-connect-jdbc-10.8.0
):
{
"name": "postgres",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"incrementing.column.name": "id",
"errors.log.include.messages": "true",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"connection.password": "password",
"tasks.max": "1",
"transforms": "createKey,extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"batch.max.rows": "1000",
"table.whitelist": "logical",
"mode": "incrementing",
"topic.prefix": "postgres_",
"transforms.extractInt.field": "id",
"connection.user": "user",
"transforms.createKey.fields": "id",
"poll.interval.ms": "1000",
"name": "postgres",
"errors.tolerance": "all",
"connection.url": "jdbc:postgresql://postgres:5432/db",
"errors.log.enable": "true"
},
"tasks": [
{
"connector": "postgres",
"task": 0
}
],
"type": "source"
}
Database content (select * from logical;
):
id | logical_date
----+--------------
1 | 2025-01-17
Database schema:
CREATE TABLE logical (
id serial PRIMARY KEY,
logical_date date
)
Schema in schema registry (curl http://localhost:8081/schemas/ids/1 | jq .schema -r | jq
):
{
"type": "object",
"title": "logical",
"properties": {
"id": {
"type": "integer",
"connect.index": 0,
"connect.type": "int32"
},
"logical_date": {
"connect.index": 1,
"oneOf": [
{
"type": "null"
},
{
"type": "integer",
"title": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"connect.type": "int32"
}
]
}
}
}
From my looking at the Kafka Connect source code, it looks as if the connector didn’t take org.apache.kafka.connect.data.Schema#name()
into account and only used org.apache.kafka.connect.data.Schema#type()
.
I can provide a reproducer in form of docker-compose.yml
and related scripts if needed.