[bug] [kafka-connect-gcp-bigtable 2.0.26] No support for Kafka Connect logical data type `Date` in message value

Repro:

docker-compose.yml:

---
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.7
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.7
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
      SCHEMA_REGISTRY_DEBUG: 'true'

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.5.7
    depends_on:
      - schema-registry
      - postgres
    ports:
      - "8083:8083"
    volumes:
      - ./confluentinc-kafka-connect-gcp-bigtable-2.0.26:/usr/share/java/confluentinc-kafka-connect-gcp-bigtable-2.0.26
    environment:
      BIGTABLE_EMULATOR_HOST: bigtable:8086
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_PLUGIN_PATH: /usr/share/java/
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081

  bigtable:
    image: google/cloud-sdk:latest
    ports:
      - 127.0.0.1:8086:8086
    entrypoint:
      - gcloud
      - beta
      - emulators
      - bigtable
      - start
      - --host-port=0.0.0.0:8086

  postgres:
    image: postgres:17
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: db
    volumes:
      - ./logical.sql:/docker-entrypoint-initdb.d/logical.sql
    ports:
      - '127.0.0.1:5432:5432'
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U user"]
      interval: 5s
      timeout: 5s
      retries: 5

restart.sh

#!/bin/bash

set -x

docker compose down -v
docker compose up -d
until diff <(echo '[]') <(curl http://localhost:8083/connectors/ | jq -rc) ; do
    sleep 1
done

echo "restarted"
./create_postgres_source.sh | jq
./create_confluent_bigtable_sink.sh | jq

echo 'insert into logical(logical) values (1.234567890)' | PGPASSWORD=password psql -h localhost -p 5432 -d db -U user

logical.sql

CREATE TABLE logical (
    id serial PRIMARY KEY,
    logical numeric
)

create_confluent_bigtable_sink.sh:

#!/bin/bash

set -euo pipefail

function create_connector() {

[[ $# -eq 4 ]] # Ensure correct number of arguments
local connector_name=$1
local source_topic=$2
local target_table=$3
local row_key_definition=$4

curl \
    -X POST \
    -H "Content-Type: application/json" \
    http://localhost:8083/connectors \
    --data '{
    "name": "'"$connector_name"'",
    "config": {
        "auto.create.column.families": "true",
        "auto.create.tables": "true",
        "confluent.license": "",
        "confluent.topic.bootstrap.servers": "kafka:29092",
        "confluent.topic.replication.factor": "1",
        "connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
        "gcp.bigtable.credentials.json": "FIXME",
        "gcp.bigtable.instance.id": "prawilny-dataflow",
        "gcp.bigtable.project.id": "unoperate-test",
        "insert.mode": "upsert",
        "name": "'"$connector_name"'",
        "row.key.definition": "'"$row_key_definition"'",
        "row.key.delimiter": "#",
        "table.name.format": "'"$target_table"'",
        "tasks.max": "1",
        "topics": "'"$source_topic"'"
    }
}'
}

create_connector confluent_postgres postgres_logical postgres_confluent_table ""

create_postgres_source.sh:

#!/bin/bash

set -euo pipefail

function create_connector() {

[[ $# -eq 1 ]] # Ensure correct number of arguments
local connector_name=$1

curl \
    -X POST \
    -H "Content-Type: application/json" \
    http://localhost:8083/connectors \
    --data '{
    "name": "'"$connector_name"'",
    "config": {
        "name": "'"$connector_name"'",
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://postgres:5432/db",
        "connection.user": "user",
        "connection.password": "password",
        "table.whitelist": "logical",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "topic.prefix": "postgres_",
        "poll.interval.ms": "1000",
        "batch.max.rows": "1000",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "transforms":"createKey,extractInt",
        "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields":"id",
        "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractInt.field":"id"
    }
}'
}

create_connector postgres

There are some dependencies:

  • postgres client
  • bash
  • docker (with compose plugin)
  • jq
  • .jar with the bigtable sink

Also the sink will complain about "FIXME" as GCP credentials so provide something in a correct shape. I don’t remember if it works with invalid credentials, you can try starting with oauth2l/integration/fixtures/fake-service-account.json at 41b41476bdc146b1bb25c08a6ad1cb2468205fff · google/oauth2l · GitHub