Facing issues with complex data processing use-case

Hello,
I’ve been trying to get a ksql query up and running which is meant to process the following records:

{
  "id": "2bc9461a-96e3-479f-95e2-ee7238954010",
  "lineItems": [
    {
      "referenceId": "3a37825f-6739-4f39-9b90-c91d4a48bd26",
      "type": "ARTICLE",
      "quantity": 1,
      "unitPrice": "5",
      "basePrice": "5",
      "lineTotal": "5",
      "taxRate": "7",
      "priceModifiers": []
    },
    {
      "referenceId": "aa3892ad-e5ca-4b26-81e6-da18a7de082a",
      "type": "ARTICLE",
      "quantity": 1,
      "unitPrice": "3",
      "basePrice": "3",
      "lineTotal": "3",
      "taxRate": "7",
      "priceModifiers": []
    }
  ],
  "articles": [
    {
      "id": "3a37825f-6739-4f39-9b90-c91d4a48bd26",
      "name": "Test1",
      "plu": "9001",
      "articleGroupId": "c2ac3dfc-c176-4655-b725-ae6eb01c60a7"
    },
    {
      "id": "aa3892ad-e5ca-4b26-81e6-da18a7de082a",
      "name": "Test2",
      "plu": "9002",
      "articleGroupId": "c2ac3dfc-c176-4655-b725-ae6eb01c60a7"
    }
  ]
}

Note that the article->id matches with some lineItem->referenceId.

I’m creating a ksql stream from the Kafka topic containing this data using:

CREATE STREAM receipts WITH (KAFKA_TOPIC='receipts', VALUE_FORMAT='avro');

What I want to do is answer the following question: within a given time frame (say 1 hour), how many of each article were sold, and how much revenue did they result in?

Essentially, from what I can see, I need a deduplicated list of articles which is then joined on the lineItems.

In plain SQL, this might look as follows (provided these entries were records in a relational database) for a global sales overview:

WITH articles_deduplicated AS (
    SELECT id, name, plu, articleGroupId
    FROM articles
    GROUP BY id, name, plu, articleGroupId
)
SELECT name, plu, articleGroupId, SUM(quantity) AS sum_quantity, SUM(line_total) AS sum_total 
FROM articles_deduplicated INNER JOIN line_items
ON article.id = line_item.referenceId
GROUP BY name, plu, articleGroupId 

My question is: how can I translate this into ksql?

I thought that I would be able to do the following to be able to answer this in ksqlDB:

  1. Produce a stream of exploded articles
CREATE STREAM articles_exploded AS 
SELECT
  id,
  EXPLODE(articles) as article
FROM receipts
PARTITION BY id

This looks as follows:

ksql> select * from articles_exploded;

+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|ID                                                                         |ARTICLE                                                                    |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|5952eb83-e227-4a6b-8d78-36e926e9aac4                                       |{ID=3a37825f-6739-4f39-9b90-c91d4a48bd26, NAME=Test1, PLU=9001, ARTICLEGROU|
|                                                                           |PID=c2ac3dfc-c176-4655-b725-ae6eb01c60a7}                                  |
|5952eb83-e227-4a6b-8d78-36e926e9aac4                                       |{ID=aa3892ad-e5ca-4b26-81e6-da18a7de082a, NAME=Test2, PLU=9002, ARTICLEGROU|
|                                                                           |PID=c2ac3dfc-c176-4655-b725-ae6eb01c60a7}                                  |
  1. Produce a stream containing the individual fields of article, rekeyed on the id:
CREATE STREAM articles_single AS 
SELECT
  article->id as article_id,
  article->name,
  article->plu,
  article->articlegroupid as article_group_id,
  id AS source_id
FROM articles_exploded
PARTITION BY article->id

This looks as follows:

+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|ARTICLE_ID                          |NAME                                |PLU                                 |ARTICLE_GROUP_ID                    |SOURCE_ID                           |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|3a37825f-6739-4f39-9b90-c91d4a48bd26|Test1                               |9001                                |c2ac3dfc-c176-4655-b725-ae6eb01c60a7|5952eb83-e227-4a6b-8d78-36e926e9aac4|
|aa3892ad-e5ca-4b26-81e6-da18a7de082a|Test2                               |9002                                |c2ac3dfc-c176-4655-b725-ae6eb01c60a7|5952eb83-e227-4a6b-8d78-36e926e9aac4|
  1. Afterwards, I create a table containing the deduplicated articles:
CREATE TABLE articles AS
SELECT 
  article_id, 
  EARLIEST_BY_OFFSET(source_id) AS source_id,
  EARLIEST_BY_OFFSET(name) as name,
  EARLIEST_BY_OFFSET(plu) as plu,
  COUNT(*) AS num_bookings
FROM articles_single 
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY article_id;

This looks as follows:

+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|ARTICLE_ID               |WINDOWSTART              |WINDOWEND                |SOURCE_ID                |NAME                     |PLU                      |NUM_BOOKINGS             |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|aa3892ad-e5ca-4b26-81e6-d|1699545600000            |1699549200000            |5952eb83-e227-4a6b-8d78-3|Test2                    |9002                     |1                        |
|a18a7de082a              |                         |                         |6e926e9aac4              |                         |                         |                         |
|3a37825f-6739-4f39-9b90-c|1699545600000            |1699549200000            |5952eb83-e227-4a6b-8d78-3|Test1                    |9001                     |1                        |
|91d4a48bd26              |                         |                         |6e926e9aac4              |                         |                         |                         |
  1. Produce a stream containing the exploded lineItems:
 CREATE STREAM line_items_exploded AS
 SELECT
  id,
  EXPLODE(lineitems) AS line_item
FROM receipts
PARTITION BY id

This looks as follows:

+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|ID                                                                                            |LINE_ITEM                                                                                     |
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|5952eb83-e227-4a6b-8d78-36e926e9aac4                                                          |{BASEPRICE=5.0, LINETOTAL=5.0, PRICEMODIFIERS=[], QUANTITY=1, REFERENCEID=3a37825f-6739-4f39-9|
|                                                                                              |b90-c91d4a48bd26, PARENTITEMID=null, TAXRATE=7.0, TYPE=ARTICLE, UNITPRICE=5.0}                |
|5952eb83-e227-4a6b-8d78-36e926e9aac4                                                          |{BASEPRICE=3.0, LINETOTAL=3.0, PRICEMODIFIERS=[], QUANTITY=1, REFERENCEID=aa3892ad-e5ca-4b26-8|
|                                                                                              |1e6-da18a7de082a, PARENTITEMID=null, TAXRATE=7.0, TYPE=ARTICLE, UNITPRICE=3.0}                |
  1. Produce another stream containing the individual fields of line_item, rekeyed on referenceId:
CREATE STREAM line_items_single AS
 SELECT
  line_item->referenceId AS reference_id,
  id AS source_id,
  line_item->basePrice AS base_price,
  line_item->quantity,
  line_item->lineTotal AS line_total
FROM line_items_exploded
PARTITION BY line_item->referenceId

Finally, this looks as follows:

+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|REFERENCE_ID                        |SOURCE_ID                           |BASE_PRICE                          |QUANTITY                            |LINE_TOTAL                          |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|3a37825f-6739-4f39-9b90-c91d4a48bd26|5952eb83-e227-4a6b-8d78-36e926e9aac4|5.000                               |1                                   |5.000                               |
|aa3892ad-e5ca-4b26-81e6-da18a7de082a|5952eb83-e227-4a6b-8d78-36e926e9aac4|3.000                               |1                                   |3.000                               |

Afterwards, I thought I could perform the following query:

ksql> select * from line_items_single join articles on line_items_single.reference_id = articles.id;

But this yields the following:

Can not join windowed source to non-windowed source.
`LINE_ITEMS_SINGLE` is not windowed
`ARTICLES` is TUMBLING windowed

This is documented in the relevant dev guide section:

Stream-table joins are always non-windowed joins. You can perform table lookups against a table when a new record arrives on the stream. Only events arriving on the stream side trigger downstream updates and produce join output. Updates on the table side don’t produce updated join output.

My question is: how would I solve this problem otherwise? I thought about potentially using a Stream-Stream join and enriching either articles or line_items_single with information in the other, but the requirement for the article data to be deduplicated logically requires, from what I can see, a Stream-Table join.

If I remove the WINDOW TUMBLING (SIZE 1 HOUR) from my articles Table query, I can do the following:

select 
    articles.source_id, 
    articles.article_id, 
    name, 
    plu, 
    latest_by_offset(num_bookings) as total_bookings,
    latest_by_offset(base_price) as base_price, 
    sum(quantity) as sum_quantity, 
    sum(line_total) as sum_total 
from line_items_single 
join articles 
    on line_items_single.reference_id = articles.article_id 
group by articles.source_id, article_id, name, plu emit changes;

I do get the result that I want (note that I produced some more records in the meantime, hence why the numbers are higher):

+------------------------------------+------------------------------------+----------------------+----------------------+---------------------+----------------------+----------------------+-----------------------+
|SOURCE_ID                           |ARTICLE_ID                          |NAME                  |PLU                   |TOTAL_BOOKINGS        |BASE_PRICE            |SUM_QUANTITY          |SUM_TOTAL             |
+------------------------------------+------------------------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|5952eb83-e227-4a6b-8d78-36e926e9aac4|aa3892ad-e5ca-4b26-81e6-da18a7de082a|Test2                 |9002                  |7                     |3.000                 |5                     |15.000                |
|5952eb83-e227-4a6b-8d78-36e926e9aac4|3a37825f-6739-4f39-9b90-c91d4a48bd26|Test1                 |9001                  |7                     |5.000                 |5                     |25.000                |

But from a logical standpoint it doesn’t make sense for me for articles to not be windowed since we would otherwise be building a global table over the entire stream. Since the number of entries may be quite large, this seems counter-intuitive.

Is there a more elegant solution that I’m not seeing, or have I misunderstood the windowing concepts?

FWIW: I’ve crossposted this to Stack Overflow in the hopes of getting a bit more exposure…

This topic was automatically closed after 30 days. New replies are no longer allowed.