Hi everyone,
I used this docker-compose file to get started with ksqldb
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.1
container_name: broker
depends_on:
- zookeeper
ports:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 9092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use kafka:29092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:5.5.1
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- zookeeper
- broker
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
ksqldb:
# *-----------------------------*
# To connect to ksqlDB CLI
# docker exec --interactive --tty ksqldb ksql http://localhost:8088
# *-----------------------------*
image: confluentinc/ksqldb-server:0.19.0
container_name: ksqldb
depends_on:
- broker
ports:
- "8088:8088"
- "8083:8083"
user: root
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_SERVICE_ID: vitec_01
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
# Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
# Kafka Connect config below
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
KSQL_CONNECT_REST_PORT: 8083
KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'
command:
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
- bash
- -c
- |
echo "Installing connector plugins"
# ------ hack to workaround absence of confluent-hub client
# mkdir -p /usr/share/confluent-hub-components/
# confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-sqlserver:1.2.2
curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
yum install -y unzip
unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
# ----------------------------------------------------------
#
echo "Launching ksqlDB"
/usr/bin/docker/run &
sleep infinity
kafkacat:
image: edenhill/kafkacat:1.6.0
container_name: kafkacat
links:
- broker
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
I created a connector to connect to my Person and Job table with debezium connector:
CREATE SOURCE CONNECTOR TEST_PERSON WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'tasks.max'= '1',
'database.hostname'= 'host',
'database.port'= '1433',
'database.user'= 'user',
'database.password'= 'password',
'database.dbname'= 'test',
'database.server.name'= 'test',
'table.whitelist'= 'dbo.Person',
'database.history.kafka.bootstrap.servers'= 'broker:29092',
'database.history.kafka.topic'= 'dbhistory.person',
'decimal.handling.mode' = 'double'
);
CREATE SOURCE CONNECTOR TEST_JOB WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'tasks.max'= '1',
'database.hostname'= 'host',
'database.port'= '1433',
'database.user'= 'user',
'database.password'= 'password',
'database.dbname'= 'test',
'database.server.name'= 'test',
'table.whitelist'= 'dbo.Job',
'database.history.kafka.bootstrap.servers'= 'broker:29092',
'database.history.kafka.topic'= 'dbhistory.job',
'decimal.handling.mode' = 'double'
);
Then checked if they were up and running:
ksql> show connectors;
Connector Name | Type | Class | Status
------------------------------------------------------------------------------------------------------------
TEST_PERSON | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
TEST_JOB | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
------------------------------------------------------------------------------------------------------------
ksql> show topics;
Kafka Topic | Partitions | Partition Replicas
----------------------------------------------------
dbhistory.test | 1 | 1
test | 1 | 1
test.dbo.Person | 1 | 1
test.dbo.Job | 1 | 1
----------------------------------------------------
ksql> print 'test.dbo.Person' from beginning LIMIT 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 2021/11/29 14:50:38.351 Z, key: **[Struct{@5711232088909099389/-]**, value: {"before": null, "after": {"Id": 1, "JobId": "10", "Name": "Test", "SurName": "Testson", "SSN": "19741214"}, "source": {"version": "1.2.2.Final", "connector": "sqlserver", "name": "test", "ts_ms": 1638197437468, "snapshot": "true", "db": "TEST", "schema": "dbo", "table": "test", "change_lsn": null, "commit_lsn": "000007a6:0000ed60:0001", "event_serial_no": null}, "op": "r", "ts_ms": 1638197437469, "transaction": null}, partition: 0
So far so good.
After that I created the ksqldb table:
CREATE TABLE Person (
> Id VARCHAR PRIMARY KEY,
> JobId INT
> ) WITH (
> KAFKA_TOPIC='test.dbo.Person',
> PARTITIONS=1,
> VALUE_FORMAT='AVRO'
> );
CREATE TABLE Job (
> Id VARCHAR PRIMARY KEY,
> ) WITH (
> KAFKA_TOPIC='test.dbo.Job',
> PARTITIONS=1,
> VALUE_FORMAT='AVRO'
> );
And here is when I encountered a problem:
SELECT * FROM Person EMIT CHANGES Limit 3;
|Struct{Id=6285} |null |
|Struct{Id=6286} |null |
|Struct{Id=6287} |null |
When using “Id BIG INT
” I’m getting errors “Error deserializing KAFKA message Size of data received by IntegerDeserializer is not 4
”. I think its becaus the value starts with Struct{Id=. Changing it to VARCHAR I could pull person table but the JobId is null. Same problem with Job table. So my question is how do I fix that?
After that is fixed I was thinking of doing the join like this:
CREATE TABLE Joined AS SELECT Person.Id, Person.JobId, Job.Name
FROM Person
INNER JOIN Job
ON Person.JobId = Job.Id
EMIT CHANGES
Limit 3;
Thanks in advance!