Create stream from Avro topics generated from DataGen

in confluent cloud, i am using DataGen connector . I am making DataGen connector to generate message in topic with Avro format.

the datagen has created a topic already with Avro format.

I want to make a stream from it in Ksqldb. I have executed command like this
The topic name that I put in the WITH, it already exist from DataGen,

it creates the stream but when i query the stream with emit changes, it keeps running but no message is returned.

is this because of Avro serialization? if yes, is it possible to create stream from AVRO topics ? what should i change in my create stream command ?

		CREATE STREAM TcProductVariant_stream_Avro (

_comment string ,
data STRUCT<
productInstanceUuid string ,
productInstanceId string,
productInstanceDescription string ,
productInstanceRevision string ,
productVariants ARRAY<STRUCT<operation string , TC_variant_id string , tc_variant_name string, variantOptions string

)
WITH (KAFKA_TOPIC = ‘TcProductVariants_Avro’,
VALUE_FORMAT = ‘avro’,
TIMESTAMP_FORMAT = ‘yyyy-MM-dd HH:mm:ss’,
PARTITIONS = 6);

Yes, you can create a stream backed by a topic populated by a Datagen connector that generates Avro-formatted data. KsqlDB will pick up the schema so you don’t even need to specify columns when creating the stream. I.e., just this should work:

CREATE STREAM TcProductVariant_stream_Avro
WITH (KAFKA_TOPIC='TcProductVariants_Avro', VALUE_FORMAT='AVRO');

Here is a tutorial that does this with one of Datagen’s built-in data sets.

I’m not sure why you aren’t seeing results when you query the stream. Possibly the connector had stopped and you queried with the default auto.offset.reset value of latest? As long as there’s some data in the topic, then you should see results if you set auto.offset.reset to earliest and query the stream:

SET 'auto.offset.reset'='earliest';
SELECT * FROM TcProductVariant_stream_Avro EMIT CHANGES;

What can be the reason this statement is failing, it is saying

Exception while preparing statement: PRODUCTINSTANCE_VARIANT_OPTIONS does not exist.

I have that topic that is generated from data gen.

CREATE or replace STREAM TC_VARIANT_OPTIONS_STREAM_V2 WITH
(
KAFKA_TOPIC=‘TC_VARIANT_OPTIONS’,
PARTITIONS=3, REPLICAS=3,
RETENTION_MS=604800000, TIMESTAMP_FORMAT=‘yyyy-MM-dd HH:mm:ss’, VALUE_FORMAT=‘JSON’)
AS select
rowtime as source_row_time,
format_timestamp(FROM_UNIXTIME(rowtime),‘yyyy-MM-dd HH:mm:ss.SSS’, ‘UTC’) as message_time,
explode(VARIANTOPTIONS) as original ,
json_keys(explode(VARIANTOPTIONS)) as json_keys ,
json_records(explode(VARIANTOPTIONS)) as json_records,
map_keys(json_records(explode(VARIANTOPTIONS))) as map_keys
from PRODUCTINSTANCE_VARIANT_OPTIONS ;

while
if i change the FROM to refer to a stream called PRODUCTINSTANCE_VARIANT_OPTIONS_STREAM that is what i created a stream from that topic, it will work.

what is the reason of failing pointing to topic ?

PRODUCTINSTANCE_VARIANT_OPTIONS is only a topic name? You can’t directly select from a topic. You have to define a stream that wraps the topic and select from the stream.

thanks coming back to original problem.

I stilll can’t create a avro stream from avro topic.

I attached example of avro message in topic

i am creating this stream

CREATE STREAM xyz (
_comment string ,
data STRUCT<
productInstanceUuid string ,
productInstanceId string,
productInstanceDescription string ,
productInstanceRevision string ,
productVariants ARRAY<STRUCT<operation string , TC_variant_id string , tc_variant_name string, variantOptions ARRAY< MAP<string,string>>

)
WITH (KAFKA_TOPIC = ‘TcProductVariants_Avro’,
VALUE_FORMAT = ‘avro’,
TIMESTAMP_FORMAT = ‘yyyy-MM-dd HH:mm:ss’,
PARTITIONS = 6);

I am getting this error in ksql processing log

“DESERIALIZATIONERRORCAUSE”: [
“Cannot deserialize type array as type string for path: ->DATA->PRODUCTVARIANTS->ARRAY->VARIANTOPTIONS”
],
“DESERIALIZATIONERRORMESSAGE”: “Error deserializing message from topic: TcProductVariants_Avro”,

example of my message in topic is this

{
“_comment”: “s)CPt}\n7I\u0002>q\u001b\u001b”,
“data”: {
“productInstanceUuid”: “8d0d1ce3-53ac-4704-9b6e-6753e9b1c4ae”,
“productInstanceId”: “P999888777”,
“productInstanceDescription”: “City Hybrid”,
“productInstanceRevision”: “A”,
“productVariants”: [
{
“operation”: “Add”,
“Tc_variant_id”: “c4bf6e3d-bf7d-4950-93da-d59d21c29ec9”,
“Tc_variant_name”: “red AND small”,
“VariantOptions”: [
{
“color”: “red”,
“quantity”: “small”
}
]
}
}
]
}
}

is there a hope in here to get a hand helping ?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.