I have 2 streams, sale baskets and salespayments
while being posted to the topic they are key’d baed on store id.
I’ve created the below two streams. if I add the word key next to invoiceNumber then the key of the topic is pushed in there, not desired, what i thought the key word do is define that as the key column to use to join on, aka a index.
how can I join these 2 streams into a 3rd output stream based in matching invoiceNumber’s ? Final plan is to emit the new stream into a new topic that can be consumed.
-- salesbasket
CREATE STREAM salesbasket (
InvoiceNumber VARCHAR ,
SaleDateTime VARCHAR,
SaleTimestamp TIMESTAMP,
TerminalPoint VARCHAR,
Nett DOUBLE,
Vat DOUBLE,
Total DOUBLE,
Store STRUCT<
Id VARCHAR,
Name VARCHAR>,
Clerk STRUCT<
Id VARCHAR,
Name VARCHAR>,
BasketItems ARRAY< STRUCT<id VARCHAR,
Name VARCHAR,
Brand VARCHAR,
Catergory VARCHAR,
Price DOUBLE,
Quantity integer >>)
WITH (KAFKA_TOPIC='salesbasket',
VALUE_FORMAT='JSON',
PARTITIONS=1);
-- salespayments
CREATE STREAM salespayments (
InvoiceNumber VARCHAR,
FinTransactionID VARCHAR,
PayDateTime VARCHAR,
PayTimestamp TIMESTAMP,
Paid DOUBLE )
WITH (KAFKA_TOPIC='salespayments',
VALUE_FORMAT='JSON',
PARTITIONS=1);
The KEY keywords tells ksqlDB about the actual physical layout of the data. Thus, if store ID is in the message key, you should use StoreId <type> KEY to tell ksqlDB how to read the data correctly.
if I add the word key next to invoiceNumber then the key of the topic is pushed in there
That is not what will happen, cf my comment above. If you do this, ksqlDB will try to read the invoiceNumber column from the message key. The KEY keyword is really a “schema description” that tells ksqlDB how to read the data from the topic.
how can I join these 2 streams into a 3rd output stream based in matching invoiceNumber’s ?
You just need to do salesbasket JOIN salespayments ON salesbasket.invoiceNumber = salespayments.invoiceNumber (plus the required window-clause for the stream-stream join.
at the moment the message header includes the key which defines on which partition to place the record… at the moment this is storeid…
if I add the word key next to invoiceNumber in my create stream then the partition key is pulled in and the value of InvoiceNumber is replaced by this value.
select
salesBASKET.invoicenumber,
salesbasket.SaleTimestamp,
salespayments.paid,
salespayments.PayDateTime
from
SALESPAYMENTS,
SALESBASKET
JOIN
salespayments ON salesbasket.invoiceNumber = salespayments.in
voiceNumber;
→ Error line 1:126: Syntax Error
Statement: select salesBASKET.invoicenumber, salesbasket.SaleTimestamp, salespayments.paid, salespayments.PayDateTime from SALESPAYMENTS, SALESBASKET JOIN salespayments ON salesbasket.invoiceNumber = salespayments.invoiceNumber;
Caused by: line 1:126: Syntax error at line 1:126
select
b.invoicenumber,
b.SaleTimestamp,
p.paid,
p.PayDateTime
from
salespayments p INNER JOIN
salesbasket b
WITHIN 7 DAYS
on b.invoiceNumber = p.invoiceNumber;
Pull queries don’t support JOIN clauses. See Queries - ksqlDB Documentation for more info.
Add EMIT CHANGES if you intended to issue a push query.
made some progress, this returned the columns from there where clause.
Now to try and add the other columns, which include objets also.
select
b.invoicenumber,
p.invoicenumber,
b.SaleTimestamp,
p.PayTimestamp
from
salespayments p INNER JOIN
salesbasket b
WITHIN 7 DAYS
on b.invoiceNumber = p.invoiceNumber
emit changes;
select
b.invoicenumber,
p.invoicenumber,
b.SaleTimestamp,
p.PayTimestamp,
b.store,
b.clerk
from
salespayments p INNER JOIN
salesbasket b
WITHIN 7 DAYS
on b.invoiceNumber = p.invoiceNumber
emit changes;
CREATE STREAM salescompleted as
select
b.InvoiceNumber as InvNumber,
b.SaleDateTime,
b.SaleTimestamp,
b.TerminalPoint,
b.Nett,
b.Vat,
b.Total,
b.store,
b.clerk,
b.BasketItems,
p.FinTransactionID,
p.PayDateTime,
p.PayTimestamp,
p.Paid
from
salespayments p INNER JOIN
salesbaskets b
WITHIN 7 DAYS
on b.InvoiceNumber = p.InvoiceNumber
emit changes;
Yes, because you told ksqlDB to read the column invoiceNumber from the message key, by adding the KEY keyword. If you want ksqlDB to read invoiceNumber from the message value, you must omit KEY.
Given that your data is partitioned by stuff in the message header, but not the actually message key, ksqlDB get “confused” if you which – ksqlDB expects data to be partitioned by key, and it cannot understand other custom partitioning.
now to figure out how to push this to a new stream and then to a new topic.
That’s one step. Add CREATE STREAM <name> WITH (...) AS... as prefix (and use the WITH clause if you want to specify a specific topic name – otherwise the stream name will be used as topic name). – There is STREAM w/o a topic that stores the STREAM’s data.
my ksql stream is in caps… => happy.
My topics I do in lower case, but for some reason can’t get the create stream with (…) as to create the created topic in lowercase.
this is where I create the stream by combining 2 streams, which is backed with a topic.
the WITH (KAKFA_TOPIC=‘my_topic_name’) you reference i understood as way to create stream, consuming from a topic (backed by a topic for persistence as source), not output to a topic as destination.
thanks for the doc, will go scan it in the morning.
I’m trying to force the output topic to lower case.
There is CREATE STREAM <schema> and CREATE STREAM... AS SELECT. Both support the WITH clause. For the former, you can think of it as the “input topic” and for the later as the “result topic” when you specify the topic name.