kafka-avro-console-producer \
--broker-list XXXXXXXX:9092 --topic account_base \
--property parse.key=true \
--property key.separator=, \
--property key.schema='{"type":"string"}' \
--property value.schema='{"type": "record","name": "accountdetails","fields": [{"name": "data","type": {"name": "data","type": "record","fields": [{"name": "ACCOUNT","type": {"type": "array","items": {"name": "ACCNT_record","type": "record","fields": [{"name": "accnt_id","type": "string"},{"name": "a1","type": "string"},{"name": "a2","type": "string"},{"name": "a3","type": "string"},{"name": "a4","type": "string"}]}}},{"name": "ACCOUNT_DETAILS","type": {"type": "array","items": {"name": "ACCNT_DETAILS_record","type": "record","fields": [{"name": "accnt_id","type": "string"},{"name": "b1","type": "string"},{"name": "b2","type": "string"},{"name": "m","type": "string"}]}}}]}},{"name": "operation","type": "string"}]}' \
--property schema.registry.url=https://XXXXXXXX:8081 \
--property basic.auth.credentials.source=USER_INFO \
--property schema.registry.basic.auth.user.info='xxx:xxxx'
"1234",{"data":{"ACCOUNT":[{"accnt_id":"1234","a1":"10","a2":"20","a3":"30","a4":"40"}],"ACCOUNT_DETAILS":[{"accnt_id":"1234","b1":"50","b2":"60","m":"1"},{"accnt_id":"1234","b1":"70","b2":"80","m":"2"},{"accnt_id":"1234","b1":"90","b2":"100","m":"3"}]},"operation":"update"}
SET 'auto.offset.reset'='earliest';
create stream st_accnt_base WITH (kafka_topic='account_base', value_format='AVRO');
describe st_accnt_base;
....
Key format : KAFKA
Value format : AVRO
.....
ksql> select explode(data->ACCOUNT) as ACCOUNT from st_accnt_base emit changes;
+-----------------------------------------------------------------------------------------------+
|ACCOUNT |
+-----------------------------------------------------------------------------------------------+
|{ACCNT_ID=1234, A1=10, A2=20, A3=30, A4=40} |
create stream t_accnt as select explode(data->ACCOUNT) as ACCOUNT from st_accnt_base emit changes;
describe extended t_accnt;
....
Key format : KAFKA
Value format : AVRO
....
ksql> select * from t_accnt emit changes;
+-----------------------------------------------------------------------------------------------+
|ACCOUNT |
+-----------------------------------------------------------------------------------------------+
no output
in ksql-streams log found
[2018-09-17 12:29:09,929] WARN stream-thread [_confluent-ksql-default_query_CSAS_T_ACCNT_531_98722fferP_637hijk_lm78262_37hk-wyh9-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[account_base] partition=[0] offset=[1] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
print 'account_base' from beginning;
Key format: AVRO or HOPPING(KAFKA_STRING), TUMBLING(KAFKA_STRING), KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2021/02/24 14:35:50.265 Z, key: 1234, value: {"data": {"ACCOUNT": [{"accnt_id": "1234", "a1": "10", "a2": "20", "a3": "30", "a4": "40"}], "ACCOUNT_DETAILS": [{"accnt_id": "1234", "b1": "50", "b2": "60", "m": "1"}, {"accnt_id": "1234", "b1": "70", "b2": "80", "m": "2"}, {"accnt_id": "1234", "b1": "90", "b2": "100", "m": "3"}]}, "operation": "update"}
Able to see data of source topic account_base with print, kafka-avro-console-consumer
Event though key exists in source topic , not sure why working for SELECT and not with CSAS
Any idea why it is unable to desacralize the data and works with SELECT
Please help/suggest on this situation