Table to Table join

Hi everyone,
I used this docker-compose file to get started with ksqldb

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.1
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.1
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - zookeeper
      - broker
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.19.0
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
      - "8083:8083"
    user: root
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: vitec_01
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
      # Kafka Connect config below
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
      KSQL_CONNECT_REST_PORT: 8083
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'

    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # ------ hack to workaround absence of confluent-hub client
        # mkdir -p /usr/share/confluent-hub-components/
        # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ debezium/debezium-connector-sqlserver:1.2.2
        curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/debezium/debezium-connector-sqlserver/versions/1.2.2/debezium-debezium-connector-sqlserver-1.2.2.zip -o /tmp/kafka-connect-mssql.zip
        yum install -y unzip
        unzip /tmp/kafka-connect-mssql.zip -d /usr/share/java/
        # ----------------------------------------------------------
        #
        echo "Launching ksqlDB"
        /usr/bin/docker/run &
        sleep infinity
  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    links:
      - broker
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

I created a connector to connect to my Person and Job table with debezium connector:

CREATE SOURCE CONNECTOR TEST_PERSON WITH (
    'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
    'tasks.max'= '1',
    'database.hostname'= 'host',
    'database.port'= '1433',
    'database.user'= 'user',
    'database.password'= 'password',
    'database.dbname'= 'test',
    'database.server.name'= 'test',
    'table.whitelist'= 'dbo.Person',
    'database.history.kafka.bootstrap.servers'= 'broker:29092',
    'database.history.kafka.topic'= 'dbhistory.person',
    'decimal.handling.mode' = 'double'
);

CREATE SOURCE CONNECTOR TEST_JOB WITH (
    'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
    'tasks.max'= '1',
    'database.hostname'= 'host',
    'database.port'= '1433',
    'database.user'= 'user',
    'database.password'= 'password',
    'database.dbname'= 'test',
    'database.server.name'= 'test',
    'table.whitelist'= 'dbo.Job',
    'database.history.kafka.bootstrap.servers'= 'broker:29092',
    'database.history.kafka.topic'= 'dbhistory.job',
    'decimal.handling.mode' = 'double'
);

Then checked if they were up and running:

ksql> show connectors;

 Connector Name | Type   | Class                                              | Status
------------------------------------------------------------------------------------------------------------
 TEST_PERSON     | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
TEST_JOB     | SOURCE | io.debezium.connector.sqlserver.SqlServerConnector | RUNNING (1/1 tasks RUNNING)
------------------------------------------------------------------------------------------------------------

ksql> show topics;
 Kafka Topic      | Partitions | Partition Replicas
----------------------------------------------------
 dbhistory.test      | 1          | 1
 test                         | 1          | 1
 test.dbo.Person  | 1          | 1
 test.dbo.Job        | 1          | 1
----------------------------------------------------


ksql> print 'test.dbo.Person' from beginning LIMIT 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 2021/11/29 14:50:38.351 Z, key: **[Struct{@5711232088909099389/-]**, value: {"before": null, "after": {"Id": 1, "JobId": "10", "Name": "Test", "SurName": "Testson", "SSN": "19741214"}, "source": {"version": "1.2.2.Final", "connector": "sqlserver", "name": "test", "ts_ms": 1638197437468, "snapshot": "true", "db": "TEST", "schema": "dbo", "table": "test", "change_lsn": null, "commit_lsn": "000007a6:0000ed60:0001", "event_serial_no": null}, "op": "r", "ts_ms": 1638197437469, "transaction": null}, partition: 0

So far so good.
After that I created the ksqldb table:

CREATE TABLE Person (
>      Id VARCHAR PRIMARY KEY,
>      JobId INT
>    ) WITH (
>      KAFKA_TOPIC='test.dbo.Person',
>      PARTITIONS=1,
>      VALUE_FORMAT='AVRO'
>    );

CREATE TABLE Job (
>      Id VARCHAR PRIMARY KEY,
>    ) WITH (
>      KAFKA_TOPIC='test.dbo.Job',
>      PARTITIONS=1,
>      VALUE_FORMAT='AVRO'
>    );

And here is when I encountered a problem:

SELECT * FROM Person EMIT CHANGES Limit 3;
|Struct{Id=6285}                                                                     |null                                                                                   |
|Struct{Id=6286}                                                                     |null                                                                                   |
|Struct{Id=6287}                                                                     |null                                                                                   |

When using “Id BIG INT” I’m getting errors “Error deserializing KAFKA message Size of data received by IntegerDeserializer is not 4”. I think its becaus the value starts with Struct{Id=. Changing it to VARCHAR I could pull person table but the JobId is null. Same problem with Job table. So my question is how do I fix that?

After that is fixed I was thinking of doing the join like this:

CREATE TABLE Joined AS SELECT Person.Id, Person.JobId, Job.Name
    FROM Person
    INNER JOIN Job
    ON Person.JobId = Job.Id
    EMIT CHANGES
    Limit 3;

Thanks in advance!

I got it to work!

I added these rows to the source connect:

    'include.schema.changes' = 'false',
    'transforms' = 'unwrap,extractkey',
    'transforms.unwrap.type' = 'io.debezium.transforms.ExtractNewRecordState',
    'transforms.extractkey.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
    'transforms.extractkey.field' = 'Id',
    'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter' = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081'

With this I can create the table and do the join:

CREATE TABLE Person (
    rowkey varchar primary key,
	Id BIGINT,
    JobId BIGINT
)
with(KAFKA_TOPIC='test.dbo.Person',
	VALUE_FORMAT='AVRO');

But I don’t get any changes from the database. I checked the CDC with these sql commands and the database and table is enabled. From the cdc.dbo.Person_CT I can see that the changes is logged:

-- Check if CDC is enabled for database
SELECT name, is_cdc_enabled
    FROM sys.databases where is_cdc_enabled = 1

-- Check if CDC is enabled for table
SELECT [name] AS TableName
FROM sys.tables  WHERE is_tracked_by_cdc =1
GO

-- List all cdc
exec sys.sp_cdc_help_change_data_capture

cdc.dbo.Person_CT
__$start_lsn	        __$end_lsn	__$seqval	            __$operation	__$update_mask	Id	    JobId
0x000007B3000180780003	NULL	    0x000007B3000180780002	3	            0x04	        1		2	
0x000007B3000180780003	NULL	    0x000007B3000180780002	4	            0x04	        1		3	

But the test.dbo.Person topic doesn’t get the changes. Does anyone know why?

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.