Repro:
docker-compose.yml:
---
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.7
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.5.7
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
SCHEMA_REGISTRY_DEBUG: 'true'
kafka-connect:
image: confluentinc/cp-kafka-connect:7.5.7
depends_on:
- schema-registry
- postgres
ports:
- "8083:8083"
volumes:
- ./confluentinc-kafka-connect-gcp-bigtable-2.0.26:/usr/share/java/confluentinc-kafka-connect-gcp-bigtable-2.0.26
environment:
BIGTABLE_EMULATOR_HOST: bigtable:8086
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_GROUP_ID: compose-connect-group
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_PLUGIN_PATH: /usr/share/java/
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
bigtable:
image: google/cloud-sdk:latest
ports:
- 127.0.0.1:8086:8086
entrypoint:
- gcloud
- beta
- emulators
- bigtable
- start
- --host-port=0.0.0.0:8086
postgres:
image: postgres:17
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: db
volumes:
- ./logical.sql:/docker-entrypoint-initdb.d/logical.sql
ports:
- '127.0.0.1:5432:5432'
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user"]
interval: 5s
timeout: 5s
retries: 5
restart.sh
#!/bin/bash
set -x
docker compose down -v
docker compose up -d
until diff <(echo '[]') <(curl http://localhost:8083/connectors/ | jq -rc) ; do
sleep 1
done
echo "restarted"
./create_postgres_source.sh | jq
./create_confluent_bigtable_sink.sh | jq
echo 'insert into logical(logical) values (1.234567890)' | PGPASSWORD=password psql -h localhost -p 5432 -d db -U user
logical.sql
CREATE TABLE logical (
id serial PRIMARY KEY,
logical numeric
)
create_confluent_bigtable_sink.sh:
#!/bin/bash
set -euo pipefail
function create_connector() {
[[ $# -eq 4 ]] # Ensure correct number of arguments
local connector_name=$1
local source_topic=$2
local target_table=$3
local row_key_definition=$4
curl \
-X POST \
-H "Content-Type: application/json" \
http://localhost:8083/connectors \
--data '{
"name": "'"$connector_name"'",
"config": {
"auto.create.column.families": "true",
"auto.create.tables": "true",
"confluent.license": "",
"confluent.topic.bootstrap.servers": "kafka:29092",
"confluent.topic.replication.factor": "1",
"connector.class": "io.confluent.connect.gcp.bigtable.BigtableSinkConnector",
"gcp.bigtable.credentials.json": "FIXME",
"gcp.bigtable.instance.id": "prawilny-dataflow",
"gcp.bigtable.project.id": "unoperate-test",
"insert.mode": "upsert",
"name": "'"$connector_name"'",
"row.key.definition": "'"$row_key_definition"'",
"row.key.delimiter": "#",
"table.name.format": "'"$target_table"'",
"tasks.max": "1",
"topics": "'"$source_topic"'"
}
}'
}
create_connector confluent_postgres postgres_logical postgres_confluent_table ""
create_postgres_source.sh:
#!/bin/bash
set -euo pipefail
function create_connector() {
[[ $# -eq 1 ]] # Ensure correct number of arguments
local connector_name=$1
curl \
-X POST \
-H "Content-Type: application/json" \
http://localhost:8083/connectors \
--data '{
"name": "'"$connector_name"'",
"config": {
"name": "'"$connector_name"'",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/db",
"connection.user": "user",
"connection.password": "password",
"table.whitelist": "logical",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres_",
"poll.interval.ms": "1000",
"batch.max.rows": "1000",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"transforms":"createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"id"
}
}'
}
create_connector postgres
There are some dependencies:
- postgres client
bashdocker(withcomposeplugin)jq.jarwith the bigtable sink
Also the sink will complain about "FIXME" as GCP credentials so provide something in a correct shape. I don’t remember if it works with invalid credentials, you can try starting with oauth2l/integration/fixtures/fake-service-account.json at 41b41476bdc146b1bb25c08a6ad1cb2468205fff · google/oauth2l · GitHub