Specifying a key converter for debezium

Hi folks,

I’m pulling cdc data from sql server using debezium successfully, but when I look at the topic, I see that connect is guessing at the key’s datatype:

print 'prismdata_cdc_prismdata.dbo.Affiliation' from beginning limit 1;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 2021/09/17 15:31:01.064 Z, key: {"Id": "25D87CEE-47AD-4612-91FD-0001A8DA43AB"}, value: {"Id": "25D87CEE-47AD-4612-91FD-0001A8DA43AB", 
...

The problem is that when I create a table over this topic a-la:

CREATE TABLE IF NOT EXISTS AffiliationTest ( RowKey VARCHAR PRIMARY KEY) WITH(VALUE_FORMAT='AVRO', KEY_FORMAT='KAFKA', KAFKA_T
OPIC='prismdata_cdc_prismdata.dbo.Affiliation');

The RowKey and the Id are not identical, which breaks any joins that use this table.

select RowKey, Id from AffiliationTest emit changes limit 1;
+----------------------------------------------------------------+----------------------------------------------------------------+
|ROWKEY                                                          |ID                                                              |
+----------------------------------------------------------------+----------------------------------------------------------------+
|H25D87CEE-47AD-4612-91FD-0001A8DA43AB                      |25D87CEE-47AD-4612-91FD-0001A8DA43AB

I’m assuming that this is a serialization issue that should be fixed with a key converter in the connector. If I try to add a "key.converter": "org.apache.kafka.connect.storage.StringConverter" to the connector, the Id of the record becomes a struct:

ROWKEY                                                          |ID                                                              |
+----------------------------------------------------------------+----------------------------------------------------------------+
|Struct{Id=25D87CEE-47AD-4612-91FD-0001A8DA43AB}                 |25D87CEE-47AD-4612-91FD-0001A8DA43AB         

Hoping that someone can explain what is happening and how to get the key in a state where I can do joins successfully.

Thanks in advance!

Hmm this is an interesting one. First, a small clarification:

connect is guessing

It’s ksqlDB that’s doing the guessing, when you run the PRINT it does its best to infer the serialisation format from the data that it reads. :slight_smile:

But on to the main problem. Perhaps you can confirm but my assumption (guess, just like ksqlDB) is that the key is being serialised using Avro? It’s weird that the ROWKEY value is the same as ID except with the leading H.

What version of ksqlDB are you running? You might want to read up on ksqlDB’s support for complex keys. One thing to try is

SELECT ROWKEY -> ID, ID FROM AffiliationTest emit changes limit 1;.

However, I think ultimately this is going to be one for the #ksqldb team to help out with rather than Kafka Connect per se, so I’m going to move the topic to that category.

Hi Robin,

Thanks for your clarification :slight_smile:

Interestingly, the ID, even though it is printed to the console as a Struct, appears to be a string under the covers:

ksql> select Id, RowKey from PV_PRISMDATA_AFFILIATION emit changes limit 1;
+----------------------------------------------------------------+----------------------------------------------------------------+
|ID                                                              |ROWKEY                                                          |
+----------------------------------------------------------------+----------------------------------------------------------------+
|25D87CEE-47AD-4612-91FD-0001A8DA43AB                            |Struct{Id=25D87CEE-47AD-4612-91FD-0001A8DA43AB}                 |

ksql> select ROWKEY->ID from PV_PRISMDATA_AFFILIATION emit changes limit 1;
Expected STRUCT type, got: STRING

KSql server and cli are running in a docker-compose network and are both at 6.2.0

services:
    
  ksqldb-server:   
    image: confluentinc/cp-ksqldb-server:6.2.0
...
ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:6.2.0

To make sure there’s no manner of crazy for this topic in the schema registry, here’s what I grep out of the schema registry for this topic:

 curl -H "Content-Type: application/vnd.schemaregistry.v1+json" -s http://localhost:8081/subjects | jq '.' | grep prismdata_cdc_prismdata.dbo.Affiliation
  "prismdata_cdc_prismdata.dbo.Affiliation-value",

If it helps, the debezium connector config looks like this:

{
    "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max" : "1",
    "database.server.name" : "prismdata",
    "database.hostname" : "prismdata",
    "database.port" : "1433",
    "database.user" : "sa",
    "database.password" : "HackMe",
    "database.dbname" : "PrismData",
    "database.history.kafka.bootstrap.servers" : "broker:29092",
    "database.history.kafka.topic": "schema-changes.prismdata",
    "table.whitelist":"dbo.Affiliation",
    "decimal.handling.mode":"double",
    "transforms": "unwrap,addTopicPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addTopicPrefix.regex":"(.*)",
    "transforms.addTopicPrefix.replacement":"prismdata_cdc_$1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }

The jdbc driver for sql server that connect is using is 8.4.1. From the Dockerfile:

COPY resources/mssql-jdbc-8.4.1.jre11.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ 

And the connector itself is, from the Dockerfile:

RUN   confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:1.2.2

Thanks for forwarding this to the Ksql team! A fun Monday puzzle for the gang, no doubt :wink:

Cheers,
_T

What if you switch back to Avro key converter, and then try the select ROWKEY->ID… again? If it’s a string then I wouldn’t have expected it to work

Hi Robin,

After adding the recommeded lines to the config json:

   "key.converter":"io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url":"http://schema-registry:28081",
   "value.converter":"io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url":"http://schema-registry:28081"

The Key is no longer spoofing as a struct (as it was with the StringConverter) , but has reverted to the the following:

ksql> print 'prismdata_cdc_prismdata.dbo.Affiliation' from beginning limit 1;
Key format: AVRO or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: AVRO
rowtime: 2021/09/20 18:46:25.456 Z, key: {"Id": "25D87CEE-47AD-4612-91FD-0001A8DA43AB"}, 

ksql> select RowKey, Id from PV_PRISMDATA_AFFILIATION emit changes limit 1;
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|ROWKEY                                                                                        |ID                                                                                            |
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|
 H25D87CEE-47AD-4612-91FD-0001A8DA43AB                                                    |25D87CEE-47AD-4612-91FD-0001A8DA43AB                                                          |

The underlying tables use the sql uniqueidentifier type for the Id (the declared PK).

I wasn’t too sure if this type is well supported, so as an experiment, I tested a couple of quick modifications to the underlying schema:
If I change the Id col in the base table to a varchar ( [Id] [varchar](127) NOT NULL,) we get:

ksql> select RowKey, Id from PV_prismdata_AffiliationStr emit changes limit 1;
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|ROWKEY                                                                                        |ID                                                                                            |
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|$H0000A822-7219-4833-AF2D-4A6A71B6D356                                                    |0000A822-7219-4833-AF2D-4A6A71B6D356 

And similarly if I give the ID column an explicit default ( [Id] [uniqueidentifier] NOT NULL DEFAULT NEWSEQUENTIALID(), )

ksql> select RowKey, Id from PV_prismdata_AffiliationSeq emit changes limit 1;
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|ROWKEY                                                                                        |ID                                                                                            |
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|&H25D87CEE-47AD-4612-91FD-0001A8DA43AB                                                    |25D87CEE-47AD-4612-91FD-0001A8DA43AB                                                          |
Limit Reached

Any sense as to what bytes are being pre-pended on the Id when the table’s PK is being defined? Is there some clever thing I can do to explicitly define the stream’s key as a simple string? Currently it appears to think the key is json…

Thanks!
_T

For the AvroConverter option, have you tried setting KEY_FORMAT='AVRO'?

Another possibility with the original JSON key would be to define the key as ROWKEY STRUCT<ID STRING> PRIMARY KEY and KEY_FORMAT='JSON'

Hi Gents,

Thanks for your replies on this. In the end, what I did was to include the key in the value part of the message (i.e. AS_VALUE(ID) as Value_ID) and then PARTITION BY on that into a new stream. This seemed to avoid the strange prepended and so allowed me to skate around the bad join issue. Will leave the underlying issue of the prepend as a mystery of the universe for other minds to contemplate.

Many thanks!
_Tim