Hello Everyone,
I hope someone here can help me with this issue.
I have a lookup table which comes from an kafka topic provided via jdbc connector (using Avro key/value converter and schema registry).
key:
{"product_key": "0286-04KARTE"}
Value:
{"product_key":"0286-04KARTE","product_id":2888793,"customer_id":10004}
Then I get via REST Connector product attributes.
I want to JOIN the attribute STREAM with the product TABLE to enrich the products with those informations.
I do get data into flattend_product_attributes correctly but when I do the JOIN I get no results.
I do get warnings like:
[2022-02-07 16:39:47,715] WARN Skipping record due to null key. topic=[input_products] partition=[6] offset=[400275] (org.apache.kafka.streams.kstream.internals.KTableSource)
I suppose this is ok when the key is not existing in Table. But there are also product_keys which exists in both.
KSQLDB Statements:
CREATE TABLE products_lookup (
product_id BIGINT,
product_key VARCHAR PRIMARY KEY,
customer_id INT
) WITH (kafka_topic='input_products', partitions=30, key_format='AVRO', value_format='AVRO');
CREATE STREAM product_attributes
( gtin BIGINT,
created VARCHAR,
manufacturerAttributes STRUCT <
colorName VARCHAR,
"size" VARCHAR,
bteCode VARCHAR,
material VARCHAR,
functions VARCHAR,
name VARCHAR,
description VARCHAR,
colorCode VARCHAR,
shortDescription VARCHAR,
category VARCHAR,
manufacturedIn VARCHAR,
customsTariffNumber VARCHAR
>,
media STRUCT <
images ARRAY <STRUCT <
isPreliminary VARCHAR,
isMainImage VARCHAR,
shootType VARCHAR,
_id VARCHAR,
productView VARCHAR,
hash VARCHAR
>>>,
fcAttributes STRUCT <
animalParts VARCHAR,
targetGroup VARCHAR,
material VARCHAR,
careInstruction ARRAY<VARCHAR>,
season VARCHAR,
ageGroup VARCHAR,
category VARCHAR,
searchColors VARCHAR
>,
fcKeys STRUCT <
animalParts VARCHAR,
targetGroup VARCHAR,
materials ARRAY <STRUCT <
composition ARRAY < STRUCT <
material VARCHAR,
percentage INT
>
>,
partName VARCHAR
>
>,
careInstruction ARRAY<VARCHAR>,
season VARCHAR,
ageGroup VARCHAR,
category VARCHAR,
searchColors VARCHAR,
seasonYear VARCHAR
>,
articleNumber VARCHAR,
permissions STRUCT <
erp VARCHAR,
ecommerce VARCHAR,
digitalWindow VARCHAR
>,
colorGroupId VARCHAR,
styleGroupId VARCHAR,
_id VARCHAR,
brand STRUCT <
id VARCHAR
>,
updated VARCHAR
)
WITH (KAFKA_TOPIC='input_product_attributes',
VALUE_FORMAT='JSON',
partitions=30);
CREATE STREAM flattend_product_attributes
WITH (VALUE_FORMAT='AVRO') AS
SELECT
gtin,
created AS CREATEDAT,
manufacturerAttributes->colorName AS fc_mfa_colorName,
manufacturerAttributes->"size" AS fc_mfa_size,
manufacturerAttributes->bteCode AS fc_mfa_bteCode,
manufacturerAttributes->material AS fc_mfa_material,
manufacturerAttributes->functions AS fc_mfa_functions,
manufacturerAttributes->name AS fc_mfa_name,
manufacturerAttributes->description AS fc_mfa_description,
manufacturerAttributes->colorCode AS fc_mfa_colorCode,
manufacturerAttributes->shortDescription AS fc_mfa_shortDescription,
manufacturerAttributes->category AS fc_mfa_category,
manufacturerAttributes->manufacturedIn AS fc_mfa_manufacturedIn,
manufacturerAttributes->customsTariffNumber AS fc_mfa_customsTariffNumber,
EXPLODE(media->images)->isPreliminary AS fc_media_images_isPreliminary,
EXPLODE(media->images)->isMainImage AS fc_media_images_isMainImage,
EXPLODE(media->images)->shootType AS fc_media_images_shootType,
EXPLODE(media->images)->_id AS fc_media_images_id,
EXPLODE(media->images)->productView AS fc_media_images_productView,
EXPLODE(media->images)->hash AS fc_media_images_hash,
fcAttributes->animalParts AS fc_attr_animalParts,
fcAttributes->targetGroup AS fc_attr_targetGroup,
fcAttributes->material AS fc_attr_material,
EXPLODE(fcAttributes->careInstruction) AS fc_attr_careInstruction,
fcAttributes->season AS fc_attr_season,
fcAttributes->ageGroup AS fc_attr_ageGroup,
fcAttributes->category AS fc_attr_category,
fcAttributes->searchColors AS fc_attr_searchColors,
articleNumber AS product_key,
colorGroupId AS fc_colorGroupId,
styleGroupId AS fc_styleGroupId,
_id AS fc_id,
brand->id AS fc_brand_id,
updated AS CHANGEDAT
FROM product_attributes
PARTITION BY articleNumber;
CREATE STREAM product_attributes_flatten_data_combined
WITH (VALUE_FORMAT='AVRO' ) AS
SELECT
p.product_id,
a.product_key,
AS_VALUE(a.product_key) AS parent_product_key,
a.fc_mfa_colorName,
a.fc_mfa_bteCode,
a.fc_mfa_material,
a.fc_mfa_name,
a.fc_mfa_colorCode,
a.fc_mfa_description,
a.fc_mfa_shortDescription,
a.fc_mfa_category,
a.fc_mfa_manufacturedIn,
a.fc_mfa_customsTariffNumber,
a.fc_colorGroupId,
a.fc_styleGroupId,
a.fc_brand_id,
p.customer_id,
a.CREATEDAT,
a.CHANGEDAT
FROM flattend_product_attributes a
INNER JOIN products_lookup p ON a.product_key = p.product_key;