Hello,
I’ve been trying to get a ksql query up and running which is meant to process the following records:
{
"id": "2bc9461a-96e3-479f-95e2-ee7238954010",
"lineItems": [
{
"referenceId": "3a37825f-6739-4f39-9b90-c91d4a48bd26",
"type": "ARTICLE",
"quantity": 1,
"unitPrice": "5",
"basePrice": "5",
"lineTotal": "5",
"taxRate": "7",
"priceModifiers": []
},
{
"referenceId": "aa3892ad-e5ca-4b26-81e6-da18a7de082a",
"type": "ARTICLE",
"quantity": 1,
"unitPrice": "3",
"basePrice": "3",
"lineTotal": "3",
"taxRate": "7",
"priceModifiers": []
}
],
"articles": [
{
"id": "3a37825f-6739-4f39-9b90-c91d4a48bd26",
"name": "Test1",
"plu": "9001",
"articleGroupId": "c2ac3dfc-c176-4655-b725-ae6eb01c60a7"
},
{
"id": "aa3892ad-e5ca-4b26-81e6-da18a7de082a",
"name": "Test2",
"plu": "9002",
"articleGroupId": "c2ac3dfc-c176-4655-b725-ae6eb01c60a7"
}
]
}
Note that the article->id
matches with some lineItem->referenceId
.
I’m creating a ksql stream from the Kafka topic containing this data using:
CREATE STREAM receipts WITH (KAFKA_TOPIC='receipts', VALUE_FORMAT='avro');
What I want to do is answer the following question: within a given time frame (say 1 hour), how many of each article were sold, and how much revenue did they result in?
Essentially, from what I can see, I need a deduplicated list of articles
which is then joined on the lineItems
.
In plain SQL, this might look as follows (provided these entries were records in a relational database) for a global sales overview:
WITH articles_deduplicated AS (
SELECT id, name, plu, articleGroupId
FROM articles
GROUP BY id, name, plu, articleGroupId
)
SELECT name, plu, articleGroupId, SUM(quantity) AS sum_quantity, SUM(line_total) AS sum_total
FROM articles_deduplicated INNER JOIN line_items
ON article.id = line_item.referenceId
GROUP BY name, plu, articleGroupId
My question is: how can I translate this into ksql?
I thought that I would be able to do the following to be able to answer this in ksqlDB
:
- Produce a stream of exploded
articles
CREATE STREAM articles_exploded AS
SELECT
id,
EXPLODE(articles) as article
FROM receipts
PARTITION BY id
This looks as follows:
ksql> select * from articles_exploded;
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|ID |ARTICLE |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|5952eb83-e227-4a6b-8d78-36e926e9aac4 |{ID=3a37825f-6739-4f39-9b90-c91d4a48bd26, NAME=Test1, PLU=9001, ARTICLEGROU|
| |PID=c2ac3dfc-c176-4655-b725-ae6eb01c60a7} |
|5952eb83-e227-4a6b-8d78-36e926e9aac4 |{ID=aa3892ad-e5ca-4b26-81e6-da18a7de082a, NAME=Test2, PLU=9002, ARTICLEGROU|
| |PID=c2ac3dfc-c176-4655-b725-ae6eb01c60a7} |
- Produce a stream containing the individual fields of
article
, rekeyed on theid
:
CREATE STREAM articles_single AS
SELECT
article->id as article_id,
article->name,
article->plu,
article->articlegroupid as article_group_id,
id AS source_id
FROM articles_exploded
PARTITION BY article->id
This looks as follows:
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|ARTICLE_ID |NAME |PLU |ARTICLE_GROUP_ID |SOURCE_ID |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|3a37825f-6739-4f39-9b90-c91d4a48bd26|Test1 |9001 |c2ac3dfc-c176-4655-b725-ae6eb01c60a7|5952eb83-e227-4a6b-8d78-36e926e9aac4|
|aa3892ad-e5ca-4b26-81e6-da18a7de082a|Test2 |9002 |c2ac3dfc-c176-4655-b725-ae6eb01c60a7|5952eb83-e227-4a6b-8d78-36e926e9aac4|
- Afterwards, I create a table containing the deduplicated articles:
CREATE TABLE articles AS
SELECT
article_id,
EARLIEST_BY_OFFSET(source_id) AS source_id,
EARLIEST_BY_OFFSET(name) as name,
EARLIEST_BY_OFFSET(plu) as plu,
COUNT(*) AS num_bookings
FROM articles_single
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY article_id;
This looks as follows:
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|ARTICLE_ID |WINDOWSTART |WINDOWEND |SOURCE_ID |NAME |PLU |NUM_BOOKINGS |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|aa3892ad-e5ca-4b26-81e6-d|1699545600000 |1699549200000 |5952eb83-e227-4a6b-8d78-3|Test2 |9002 |1 |
|a18a7de082a | | |6e926e9aac4 | | | |
|3a37825f-6739-4f39-9b90-c|1699545600000 |1699549200000 |5952eb83-e227-4a6b-8d78-3|Test1 |9001 |1 |
|91d4a48bd26 | | |6e926e9aac4 | | | |
- Produce a stream containing the exploded
lineItems
:
CREATE STREAM line_items_exploded AS
SELECT
id,
EXPLODE(lineitems) AS line_item
FROM receipts
PARTITION BY id
This looks as follows:
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|ID |LINE_ITEM |
+----------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------+
|5952eb83-e227-4a6b-8d78-36e926e9aac4 |{BASEPRICE=5.0, LINETOTAL=5.0, PRICEMODIFIERS=[], QUANTITY=1, REFERENCEID=3a37825f-6739-4f39-9|
| |b90-c91d4a48bd26, PARENTITEMID=null, TAXRATE=7.0, TYPE=ARTICLE, UNITPRICE=5.0} |
|5952eb83-e227-4a6b-8d78-36e926e9aac4 |{BASEPRICE=3.0, LINETOTAL=3.0, PRICEMODIFIERS=[], QUANTITY=1, REFERENCEID=aa3892ad-e5ca-4b26-8|
| |1e6-da18a7de082a, PARENTITEMID=null, TAXRATE=7.0, TYPE=ARTICLE, UNITPRICE=3.0} |
- Produce another stream containing the individual fields of
line_item
, rekeyed onreferenceId
:
CREATE STREAM line_items_single AS
SELECT
line_item->referenceId AS reference_id,
id AS source_id,
line_item->basePrice AS base_price,
line_item->quantity,
line_item->lineTotal AS line_total
FROM line_items_exploded
PARTITION BY line_item->referenceId
Finally, this looks as follows:
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|REFERENCE_ID |SOURCE_ID |BASE_PRICE |QUANTITY |LINE_TOTAL |
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|3a37825f-6739-4f39-9b90-c91d4a48bd26|5952eb83-e227-4a6b-8d78-36e926e9aac4|5.000 |1 |5.000 |
|aa3892ad-e5ca-4b26-81e6-da18a7de082a|5952eb83-e227-4a6b-8d78-36e926e9aac4|3.000 |1 |3.000 |
Afterwards, I thought I could perform the following query:
ksql> select * from line_items_single join articles on line_items_single.reference_id = articles.id;
But this yields the following:
Can not join windowed source to non-windowed source.
`LINE_ITEMS_SINGLE` is not windowed
`ARTICLES` is TUMBLING windowed
This is documented in the relevant dev guide section:
Stream-table joins are always non-windowed joins. You can perform table lookups against a table when a new record arrives on the stream. Only events arriving on the stream side trigger downstream updates and produce join output. Updates on the table side don’t produce updated join output.
My question is: how would I solve this problem otherwise? I thought about potentially using a Stream-Stream
join and enriching either articles
or line_items_single
with information in the other, but the requirement for the article
data to be deduplicated logically requires, from what I can see, a Stream-Table
join.
If I remove the WINDOW TUMBLING (SIZE 1 HOUR)
from my articles
Table query, I can do the following:
select
articles.source_id,
articles.article_id,
name,
plu,
latest_by_offset(num_bookings) as total_bookings,
latest_by_offset(base_price) as base_price,
sum(quantity) as sum_quantity,
sum(line_total) as sum_total
from line_items_single
join articles
on line_items_single.reference_id = articles.article_id
group by articles.source_id, article_id, name, plu emit changes;
I do get the result that I want (note that I produced some more records in the meantime, hence why the numbers are higher):
+------------------------------------+------------------------------------+----------------------+----------------------+---------------------+----------------------+----------------------+-----------------------+
|SOURCE_ID |ARTICLE_ID |NAME |PLU |TOTAL_BOOKINGS |BASE_PRICE |SUM_QUANTITY |SUM_TOTAL |
+------------------------------------+------------------------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|5952eb83-e227-4a6b-8d78-36e926e9aac4|aa3892ad-e5ca-4b26-81e6-da18a7de082a|Test2 |9002 |7 |3.000 |5 |15.000 |
|5952eb83-e227-4a6b-8d78-36e926e9aac4|3a37825f-6739-4f39-9b90-c91d4a48bd26|Test1 |9001 |7 |5.000 |5 |25.000 |
But from a logical standpoint it doesn’t make sense for me for articles
to not be windowed since we would otherwise be building a global table over the entire stream. Since the number of entries may be quite large, this seems counter-intuitive.
Is there a more elegant solution that I’m not seeing, or have I misunderstood the windowing concepts?
FWIW: I’ve crossposted this to Stack Overflow in the hopes of getting a bit more exposure…