CREATE TABLE issue

So I have a topic with stuff in it from CDC:

ksql> print 'mssqllatest.dbo.emp' from beginning;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/07/03 01:14:24.666 Z, key: {"empno":2}, value: {"empno":2,"ename":"HARDING","job":"MANAGER","mgr":9,"hiredate":886377600000,"sal":"T1iA","comm":"dTA=","dept":3}, partition: 0
rowtime: 2022/07/03 01:14:24.667 Z, key: {"empno":3}, value: {"empno":3,"ename":"TAFT","job":"SALES I","mgr":2,"hiredate":820540800000,"sal":"JiWg","comm":"AMNQ","dept":3}, partition: 0
rowtime: 2022/07/03 01:14:24.667 Z, key: {"empno":4}, value: {"empno":4,"ename":"HOOVER","job":"SALES I","mgr":2,"hiredate":639014400000,"sal":"KTLg","comm":null,"dept":3}, partition: 0
rowtime: 2022/07/03 01:14:24.667 Z, key: {"empno":5}, value: {"empno":5,"ename":"LINCOLN","job":"TECH","mgr":6,"hiredate":772329600000,"sal":"IlUQ","comm":"AiLg","dept":4}, partition: 0
rowtime: 2022/07/03 01:14:24.667 Z, key: {"empno":6}, value: {"empno":6,"ename":"GARFIELD","job":"MANAGER","mgr":9,"hiredate":736214400000,"sal":"UmXA","comm":null,"dept":4}, partition: 0
rowtime: 2022/07/03 01:14:24.667 Z, key: {"empno":7}, value: {"empno":7,"ename":"POLK","job":"TECH","mgr":6,"hiredate":874886400000,"sal":"JiWg","comm":null,"dept":4}, partition: 0

And I tried creating a TABLE based off that topic and querying it, so I did this:

SET 'auto.offset.reset' = 'earliest';
CREATE TABLE t2_emp (empno INT PRIMARY KEY, ename VARCHAR, dept INT) WITH (KAFKA_TOPIC='mssqllatest.dbo.emp', VALUE_FORMAT='JSON');
CREATE TABLE q2_emp AS SELECT * FROM t2_emp;

Finally, I tried querying:

ksql> select * from q2_emp emit changes;
+------------------------------------+------------------------------------+------------------------------------+
|EMPNO                               |ENAME                               |DEPT                                |
+------------------------------------+------------------------------------+------------------------------------+

Nadaā€¦ nothing thereā€¦

Any ideas as to what Iā€™m doing wrong?

Thanks!

I set up the processing log and spotted some serialization errorsā€¦ usually around INTEGER conversion on the KEY ā€¦ Iā€™m not sure what KSQL expects the key to be though ā€¦ value is JSON format. In the topic I see the key looks like JSON, so I tried adding KEY_FORMAT=ā€˜JSONā€™ but still errors:

[2022-07-05 05:43:06,002] WARN stream-thread [_confluent-ksql-default_transient_transient_EMP_417012816845244685_1656999785257-ad6eb880-a15a-487b-8cb2-6959ade007c9-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[mssqllatest.dbo.emp] partition=[0] offset=[16] (org.apache.kafka.streams.processor.internals.RecordDeserializer:89)

org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: mssqllatest.dbo.emp. Canā€™t convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $

Q: what type should I be serializing the key into in order for KSQL to work?

gave up on all that and went pure AVRO ā€¦ works great:

name = SqlServerCDCSrc
connector.class = io.debezium.connector.sqlserver.SqlServerConnector
database.server.name = mssqllatest
database.dbname = demo
database.hostname = 192.168.0.236
database.port = 1433
database.user = sa
database.password = Toughpass1!
database.instance = mssqllatest
database.history.kafka.bootstrap.servers = broker:29092
database.history.kafka.topic = dbhistory.mssqllatest
table.include.list = dbo.emp
key.converter = io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url = http://schema-registry:8081
value.converter = io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url = http://schema-registry:8081
transforms = unwrap
transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones= true
transforms.unwrap.delete.handling.mode= none

along with:

/* using AVRO for both key and value */
SET 'auto.offset.reset' = 'earliest';

CREATE TABLE emp WITH (KAFKA_TOPIC='mssqllatest.dbo.emp', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');
SELECT * FROM emp EMIT CHANGES;

SET 'auto.offset.reset' = 'earliest';
CREATE TABLE q_emp AS SELECT * FROM emp;
SELECT * FROM q_emp;

ksqlDB expect the key to be ā€œunwrappedā€, but your data provides a ā€œwrappedā€ key. Thus, you would need to either serialize the key as primitive JSON, ie, just 2 instead of {"empno":2}, or you need to define the key a struct type: empno STRUCT<INT> PRIMARY KEY.

I assume that when you switch to AVRO, the key is written as primitive AVRO and that is why itā€™s working?

Yes I noticed the debezium connector was passing the key as Struct{empno=n}, so I tried the IntegerConverter, thinking that might just pass the number, but that didnā€™t work, then I tried the JSON converter together with an SMT to extract the numberā€¦ that didnā€™t work ā€¦ but for some reason specifying KEY_FORMAT=ā€˜AVROā€™ did the trick ā€¦ although I did notice it appended a new column ā€˜ROWKEYā€™ ā€¦ but heyā€¦ this was good enough :slight_smile:

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.