INNER JOIN STREAM with a Lookup-Table

Hello Everyone,
I hope someone here can help me with this issue.

I have a lookup table which comes from an kafka topic provided via jdbc connector (using Avro key/value converter and schema registry).

key:
{"product_key": "0286-04KARTE"}
Value:
{"product_key":"0286-04KARTE","product_id":2888793,"customer_id":10004}

Then I get via REST Connector product attributes.

I want to JOIN the attribute STREAM with the product TABLE to enrich the products with those informations.

I do get data into flattend_product_attributes correctly but when I do the JOIN I get no results.

I do get warnings like:

[2022-02-07 16:39:47,715] WARN Skipping record due to null key. topic=[input_products] partition=[6] offset=[400275] (org.apache.kafka.streams.kstream.internals.KTableSource)

I suppose this is ok when the key is not existing in Table. But there are also product_keys which exists in both.

KSQLDB Statements:

CREATE TABLE products_lookup (
                  product_id BIGINT,
                  product_key VARCHAR PRIMARY KEY,
                  customer_id INT
) WITH (kafka_topic='input_products', partitions=30, key_format='AVRO', value_format='AVRO');


CREATE STREAM product_attributes
                    ( gtin BIGINT,
                      created VARCHAR,
                      manufacturerAttributes STRUCT <
                        colorName VARCHAR,
                        "size" VARCHAR,
                        bteCode VARCHAR,
                        material VARCHAR,
                        functions VARCHAR,
                        name VARCHAR,
                        description VARCHAR,
                        colorCode VARCHAR,
                        shortDescription VARCHAR,
                        category VARCHAR,
                        manufacturedIn VARCHAR,
                        customsTariffNumber VARCHAR
                      >,
                      media STRUCT <
                        images ARRAY <STRUCT <
                            isPreliminary VARCHAR,
                            isMainImage VARCHAR,
                            shootType VARCHAR,
                            _id VARCHAR,
                            productView VARCHAR,
                            hash VARCHAR
                        >>>,
                      fcAttributes STRUCT <
                        animalParts VARCHAR,
                        targetGroup VARCHAR,
                        material VARCHAR,
                        careInstruction ARRAY<VARCHAR>,
                        season VARCHAR,
                        ageGroup VARCHAR,
                        category VARCHAR,
                        searchColors VARCHAR
                      >,
                      fcKeys STRUCT <
                        animalParts VARCHAR,
                        targetGroup VARCHAR,
                        materials ARRAY <STRUCT <
                            composition ARRAY < STRUCT <
                                material VARCHAR,
                                percentage INT
                              >
                            >,
                            partName VARCHAR
                            >
                          >,
                        careInstruction ARRAY<VARCHAR>,
                        season VARCHAR,
                        ageGroup VARCHAR,
                        category VARCHAR,
                        searchColors VARCHAR,
                        seasonYear VARCHAR
                      >,
                      articleNumber VARCHAR,
                      permissions STRUCT <
                        erp VARCHAR,
                        ecommerce VARCHAR,
                        digitalWindow VARCHAR
                      >,
                      colorGroupId VARCHAR,
                      styleGroupId VARCHAR,
                      _id VARCHAR,
                      brand STRUCT <
                        id VARCHAR
                      >,
                      updated VARCHAR
                        )
                        WITH (KAFKA_TOPIC='input_product_attributes',
                        VALUE_FORMAT='JSON',
                        partitions=30);

CREATE STREAM flattend_product_attributes
                        WITH (VALUE_FORMAT='AVRO') AS
                        SELECT
                            gtin,
                            created AS CREATEDAT,
                            manufacturerAttributes->colorName AS fc_mfa_colorName,
                            manufacturerAttributes->"size" AS fc_mfa_size,
                            manufacturerAttributes->bteCode AS fc_mfa_bteCode,
                            manufacturerAttributes->material AS fc_mfa_material,
                            manufacturerAttributes->functions AS fc_mfa_functions,
                            manufacturerAttributes->name AS fc_mfa_name,
                            manufacturerAttributes->description AS fc_mfa_description,
                            manufacturerAttributes->colorCode AS fc_mfa_colorCode,
                            manufacturerAttributes->shortDescription AS fc_mfa_shortDescription,
                            manufacturerAttributes->category AS fc_mfa_category,
                            manufacturerAttributes->manufacturedIn AS fc_mfa_manufacturedIn,
                            manufacturerAttributes->customsTariffNumber AS fc_mfa_customsTariffNumber,
                            EXPLODE(media->images)->isPreliminary AS fc_media_images_isPreliminary,
                            EXPLODE(media->images)->isMainImage AS fc_media_images_isMainImage,
                            EXPLODE(media->images)->shootType AS fc_media_images_shootType,
                            EXPLODE(media->images)->_id AS fc_media_images_id,
                            EXPLODE(media->images)->productView AS fc_media_images_productView,
                            EXPLODE(media->images)->hash AS fc_media_images_hash,
                            fcAttributes->animalParts AS fc_attr_animalParts,
                            fcAttributes->targetGroup AS fc_attr_targetGroup,
                            fcAttributes->material AS fc_attr_material,
                            EXPLODE(fcAttributes->careInstruction) AS fc_attr_careInstruction,
                            fcAttributes->season AS fc_attr_season,
                            fcAttributes->ageGroup AS fc_attr_ageGroup,
                            fcAttributes->category AS fc_attr_category,
                            fcAttributes->searchColors AS fc_attr_searchColors,
                            articleNumber AS product_key,
                            colorGroupId AS fc_colorGroupId,
                            styleGroupId AS fc_styleGroupId,
                            _id AS fc_id,
                            brand->id AS fc_brand_id,
                            updated AS CHANGEDAT
                        FROM product_attributes
                        PARTITION BY articleNumber;


CREATE STREAM product_attributes_flatten_data_combined
                        WITH (VALUE_FORMAT='AVRO' ) AS
                        SELECT
                            p.product_id,
                            a.product_key,
                            AS_VALUE(a.product_key) AS parent_product_key,
                            a.fc_mfa_colorName,
                            a.fc_mfa_bteCode,
                            a.fc_mfa_material,
                            a.fc_mfa_name,
                            a.fc_mfa_colorCode,
                            a.fc_mfa_description,
                            a.fc_mfa_shortDescription,
                            a.fc_mfa_category,
                            a.fc_mfa_manufacturedIn,
                            a.fc_mfa_customsTariffNumber,
                            a.fc_colorGroupId,
                            a.fc_styleGroupId,
                            a.fc_brand_id,
                            p.customer_id,
                            a.CREATEDAT,
                            a.CHANGEDAT
                        FROM flattend_product_attributes a
                        INNER JOIN products_lookup p ON a.product_key = p.product_key;

Its it connected with this?

Because the underlying Kafka Topic of product_attributes doesn not have a message key set.

thanks to this workaround I got it fixed.

Thank you @rmoff

3 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.