How to join 2 streams based on a common column?

Hi all…

I have 2 streams, sale baskets and salespayments
while being posted to the topic they are key’d baed on store id.

I’ve created the below two streams. if I add the word key next to invoiceNumber then the key of the topic is pushed in there, not desired, what i thought the key word do is define that as the key column to use to join on, aka a index.

how can I join these 2 streams into a 3rd output stream based in matching invoiceNumber’s ? Final plan is to emit the new stream into a new topic that can be consumed.

-- salesbasket
CREATE STREAM salesbasket (
	   	InvoiceNumber VARCHAR ,
	 	SaleDateTime VARCHAR,
	 	SaleTimestamp TIMESTAMP,
	  	TerminalPoint VARCHAR,
	   	Nett DOUBLE,
	  	Vat DOUBLE,
	 	Total DOUBLE,
       	Store STRUCT<
       		Id VARCHAR,
     		Name VARCHAR>,
     	Clerk STRUCT<
     		Id VARCHAR,
          	Name VARCHAR>,
    	BasketItems ARRAY< STRUCT<id VARCHAR,
        	Name VARCHAR,
          	Brand VARCHAR,
          	Catergory VARCHAR,
         	Price DOUBLE,
        	Quantity integer >>) 
WITH (KAFKA_TOPIC='salesbasket',
		VALUE_FORMAT='JSON',
       	PARTITIONS=1);
       
       
-- salespayments       
CREATE STREAM salespayments	 (
	      InvoiceNumber VARCHAR,
	      FinTransactionID VARCHAR,
	      PayDateTime VARCHAR,
	      PayTimestamp TIMESTAMP,
	      Paid DOUBLE )        
 WITH (KAFKA_TOPIC='salespayments',
       VALUE_FORMAT='JSON',
       PARTITIONS=1);

key’d baed on store id.

The KEY keywords tells ksqlDB about the actual physical layout of the data. Thus, if store ID is in the message key, you should use StoreId <type> KEY to tell ksqlDB how to read the data correctly.

if I add the word key next to invoiceNumber then the key of the topic is pushed in there

That is not what will happen, cf my comment above. If you do this, ksqlDB will try to read the invoiceNumber column from the message key. The KEY keyword is really a “schema description” that tells ksqlDB how to read the data from the topic.

how can I join these 2 streams into a 3rd output stream based in matching invoiceNumber’s ?

You just need to do salesbasket JOIN salespayments ON salesbasket.invoiceNumber = salespayments.invoiceNumber (plus the required window-clause for the stream-stream join.

1 Like

this makes allot of sense for someone that has done it allot… for someone that needs to do it the first time… google time again.

at the moment the message header includes the key which defines on which partition to place the record… at the moment this is storeid…

if I add the word key next to invoiceNumber in my create stream then the partition key is pulled in and the value of InvoiceNumber is replaced by this value.

1 Like

First attempt to get the min join.

select 
   salesBASKET.invoicenumber, 
   salesbasket.SaleTimestamp, 
   salespayments.paid, 
   salespayments.PayDateTime 
from 
  SALESPAYMENTS, 
  SALESBASKET 
JOIN 
  salespayments ON salesbasket.invoiceNumber = salespayments.in
voiceNumber;

→ Error line 1:126: Syntax Error

Statement: select salesBASKET.invoicenumber, salesbasket.SaleTimestamp, salespayments.paid, salespayments.PayDateTime from SALESPAYMENTS, SALESBASKET JOIN salespayments ON salesbasket.invoiceNumber = salespayments.invoiceNumber;
Caused by: line 1:126: Syntax error at line 1:126

1 Like

Another attempt, different examples

select
   b.invoicenumber,
   b.SaleTimestamp,
   p.paid,
   p.PayDateTime
from
   salespayments p INNER JOIN
   salesbasket b
WITHIN 7 DAYS
on b.invoiceNumber = p.invoiceNumber;

Pull queries don’t support JOIN clauses. See Queries - ksqlDB Documentation for more info.
Add EMIT CHANGES if you intended to issue a push query.

1 Like

made some progress, this returned the columns from there where clause.
Now to try and add the other columns, which include objets also.

select 
	b.invoicenumber, 
	p.invoicenumber, 
	b.SaleTimestamp, 
	p.PayTimestamp
from 
	salespayments p INNER JOIN
	salesbasket b
WITHIN 7 DAYS 
on b.invoiceNumber = p.invoiceNumber
emit changes;
1 Like

yay… getting there

select 
	b.invoicenumber, 
	p.invoicenumber, 
	b.SaleTimestamp, 
	p.PayTimestamp,
	b.store,
	b.clerk
from 
	salespayments p INNER JOIN
	salesbasket b
WITHIN 7 DAYS 
on b.invoiceNumber = p.invoiceNumber
emit changes;
1 Like

… total query…
now to figure out how to push this to a new stream and then to a new topic.

select 
	b.invoicenumber, 
	b.SaleDateTime,
	b.SaleTimestamp, 
	b.TerminalPoint,
	b.Nett,
	b.Vat,
	b.Total,
	b.store,
	b.clerk,
	b.BasketItems,
	p.FinTransactionID,
	p.PayDateTime,
	p.PayTimestamp,
	p.Paid
from 
	salespayments p INNER JOIN
	salesbasket b
WITHIN 7 DAYS 
on b.invoiceNumber = p.invoiceNumber
emit changes;
1 Like

Done… Got what I was trying to accomplish…

CREATE STREAM salescompleted as  
select 
	b.InvoiceNumber as InvNumber, 
	b.SaleDateTime,
	b.SaleTimestamp, 
	b.TerminalPoint,
	b.Nett,
	b.Vat,
	b.Total,
	b.store,
	b.clerk,
	b.BasketItems,
	p.FinTransactionID,
	p.PayDateTime,
	p.PayTimestamp,
	p.Paid
from 
	salespayments p INNER JOIN
	salesbaskets b
WITHIN 7 DAYS 
on b.InvoiceNumber = p.InvoiceNumber
emit changes;
1 Like

Yes, because you told ksqlDB to read the column invoiceNumber from the message key, by adding the KEY keyword. If you want ksqlDB to read invoiceNumber from the message value, you must omit KEY.

Given that your data is partitioned by stuff in the message header, but not the actually message key, ksqlDB get “confused” if you which – ksqlDB expects data to be partitioned by key, and it cannot understand other custom partitioning.

now to figure out how to push this to a new stream and then to a new topic.

That’s one step. Add CREATE STREAM <name> WITH (...) AS... as prefix (and use the WITH clause if you want to specify a specific topic name – otherwise the stream name will be used as topic name). – There is STREAM w/o a topic that stores the STREAM’s data.

i’ve tried various ways …

my ksql stream is in caps… => happy.
My topics I do in lower case, but for some reason can’t get the create stream with (…) as to create the created topic in lowercase.

mind sharing a example, for my above create.

G

Can you share the statement? I believe WITH (KAKFA_TOPIC='my_topic_name') should be case sensitive already.

At least this is how I read the docs: CREATE STREAM AS SELECT - ksqlDB Documentation

this is where I create the stream by combining 2 streams, which is backed with a topic.

the WITH (KAKFA_TOPIC=‘my_topic_name’) you reference i understood as way to create stream, consuming from a topic (backed by a topic for persistence as source), not output to a topic as destination.

thanks for the doc, will go scan it in the morning.

I’m trying to force the output topic to lower case.

G

There is CREATE STREAM <schema> and CREATE STREAM... AS SELECT. Both support the WITH clause. For the former, you can think of it as the “input topic” and for the later as the “result topic” when you specify the topic name.

1 Like

Solved… thanks @mjsax

All 3 streams, the 2 source and the 3rd combined.

CREATE STREAM salesbaskets (
	   	InvoiceNumber VARCHAR,
	 	SaleDateTime VARCHAR,
	 	SaleTimestamp TIMESTAMP,
	  	TerminalPoint VARCHAR,
	   	Nett DOUBLE,
	  	Vat DOUBLE,
	 	Total DOUBLE,
       	Store STRUCT<
       		Id VARCHAR,
     		Name VARCHAR>,
     	Clerk STRUCT<
     		Id VARCHAR,
          	Name VARCHAR>,
    	BasketItems ARRAY< STRUCT<id VARCHAR,
        	Name VARCHAR,
          	Brand VARCHAR,
          	Category VARCHAR,
         	Price DOUBLE,
        	Quantity integer >>) 
WITH (KAFKA_TOPIC='salesbaskets',
		VALUE_FORMAT='JSON',
       	PARTITIONS=1);

CREATE STREAM salespayments	 (	
	      InvoiceNumber VARCHAR,
	      FinTransactionID VARCHAR,
	      PayDateTime VARCHAR,
	      PayTimestamp TIMESTAMP,
	      Paid DOUBLE )        
 WITH (KAFKA_TOPIC='salespayments',
       VALUE_FORMAT='JSON',
       PARTITIONS=1);
     
CREATE STREAM salescompleted WITH (KAFKA_TOPIC='salescompleted',
       VALUE_FORMAT='JSON',
       PARTITIONS=1)
       as  
select 
	b.InvoiceNumber InvNumber, 
	b.SaleDateTime,
	b.SaleTimestamp, 
	b.TerminalPoint,
	b.Nett,
	b.Vat,
	b.Total,
	b.store,
	b.clerk,
	b.BasketItems,
	p.FinTransactionID,
	p.PayDateTime,
	p.PayTimestamp,
	p.Paid
from 
	salespayments p INNER JOIN
	salesbaskets b
WITHIN 7 DAYS 
on b.InvoiceNumber = p.InvoiceNumber
emit changes;
1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.