I have two streams
one is CUSTOMERS_STREAM with fields- CUSTOMER_ID, CUSTOMER_FIRST_NAME, CUSTOMER_LAST_NAME, CUSTOMER_EMAIL from topic dbserver1.inventory.customers
and another with ORDERS_STREAM with feilds- ORDER_ID, ORDER_DATE, PURCHASER,ORDER_QUANTIY, PRODUCT_ID. from topic dbserver1.inventory.orders
Here the relation between the topics is purchaser is an id of the customer .
So, to merge two streams my query goes like this
SELECT
o.PAYLOAD->AFTER->id AS ORDER_ID,
o.PAYLOAD->AFTER->order_date AS ORDER_DATE,
c.PAYLOAD->AFTER->id AS CUSTOMER_ID,
c.PAYLOAD->AFTER->first_name AS CUSTOMER_FIRST_NAME,
c.PAYLOAD->AFTER->last_name AS CUSTOMER_LAST_NAME,
c.PAYLOAD->AFTER->email AS CUSTOMER_EMAIL,
o.PAYLOAD->AFTER->quantity AS ORDER_QUANTIY,
o.PAYLOAD->AFTER->product_id AS PRODUCT_ID
FROM
CUSTOMERS_STREAM c
JOIN
ORDERS_STREAM o
ON
c.PAYLOAD->AFTER->id = o.PAYLOAD->AFTER->purchaser
EMIT CHANGES;
and I got an Error
Stream-Stream joins must have a WITHIN clause specified. None was provided. To learn about how to specify a WITHIN clause with a stream-stream join, please visit: https://docs.confluent.io/current/ksql/docs/syntax-reference.html#create-stream-as-select
Upon Searching I got to know that WITHIN is Mandatory Feild, There comes the issue
I am having 18 records as of now ( order id 10005 is missing) in database for orders
if I write
SELECT
o.PAYLOAD->AFTER->id AS ORDER_ID,
o.PAYLOAD->AFTER->order_date AS ORDER_DATE,
c.PAYLOAD->AFTER->id AS CUSTOMER_ID,
c.PAYLOAD->AFTER->first_name AS CUSTOMER_FIRST_NAME,
c.PAYLOAD->AFTER->last_name AS CUSTOMER_LAST_NAME,
c.PAYLOAD->AFTER->email AS CUSTOMER_EMAIL,
o.PAYLOAD->AFTER->quantity AS ORDER_QUANTIY,
o.PAYLOAD->AFTER->product_id AS PRODUCT_ID
FROM
CUSTOMERS_STREAM c
JOIN
ORDERS_STREAM o WITHIN 5 MINUTE
ON
c.PAYLOAD->AFTER->id = o.PAYLOAD->AFTER->purchaser
EMIT CHANGES;
this It returned me only top 4 records
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_ID |ORDER_DATE |CUSTOMER_ID |CUSTOMER_FIRST_NAME |CUSTOMER_LAST_NAME |CUSTOMER_EMAIL |ORDER_QUANTIY |PRODUCT_ID |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|10001 |16816 |1001 |Sally |Thomas |sally.thomas@acme.com |1 |102 |
|10002 |16817 |1002 |George |Bailey |gbailey@foobar.com |2 |105 |
|10003 |16850 |1002 |George |Bailey |gbailey@foobar.com |2 |106 |
|10004 |16852 |1003 |Edward |Walker |ed@walker.com |1 |107 |
If i wrote WITHIN 5 HOUR
i got
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_ID |ORDER_DATE |CUSTOMER_ID |CUSTOMER_FIRST_NAME |CUSTOMER_LAST_NAME |CUSTOMER_EMAIL |ORDER_QUANTIY |PRODUCT_ID |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|10001 |16816 |1001 |Sally |Thomas |sally.thomas@acme.com |1 |102 |
|10002 |16817 |1002 |George |Bailey |gbailey@foobar.com |2 |105 |
|10003 |16850 |1002 |George |Bailey |gbailey@foobar.com |2 |106 |
|10004 |16852 |1003 |Edward |Walker |ed@walker.com |1 |107 |
|10007 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |107 |
|10008 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |107 |
|10009 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |107 |
|10010 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |106 |
Here you can see 10006 is missing i don’t know why
If i write WITHIN 30 DAY
I got all records
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_ID |ORDER_DATE |CUSTOMER_ID |CUSTOMER_FIRST_NAME |CUSTOMER_LAST_NAME |CUSTOMER_EMAIL |ORDER_QUANTIY |PRODUCT_ID |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|10001 |16816 |1001 |Sally |Thomas |sally.thomas@acme.com |1 |102 |
|10002 |16817 |1002 |George |Bailey |gbailey@foobar.com |2 |105 |
|10003 |16850 |1002 |George |Bailey |gbailey@foobar.com |2 |106 |
|10004 |16852 |1003 |Edward |Walker |ed@walker.com |1 |107 |
|10006 |19444 |1001 |Sally |Thomas |sally.thomas@acme.com |5 |104 |
|10007 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |107 |
|10008 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |107 |
|10009 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |107 |
|10010 |19445 |1007 |Eswar Sai |Kalipalli |eswar@gmail.com |10 |106 |
|10011 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10012 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10013 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10014 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10015 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10016 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10017 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10018 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
|10019 |19445 |1006 |likhith |kanigolla |likhitht@gmail.com |10 |106 |
I don’t know about this strange behaviour i am totally confused one more question is ,
Is it mandatory to have the timestamp coloum in streams to merge?
And I wanted to know it is because of the data or with the configuration or something else
Commands I Used is
----------------------------ORDERS-------------------------------------------
CREATE STREAM ORDERS_STREAM (
schema STRUCT< >,
payload STRUCT<before STRUCT<id INT,order_date INT,purchaser INT,quantity INT,product_id INT>,
after STRUCT<id INT,order_date INT,purchaser INT,quantity INT,product_id INT>,
source STRUCT< >,
op VARCHAR,
ts_ms DOUBLE,
transaction VARCHAR>)
WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='JSON',PARTITIONS=1);
--------------------------CUSTOMERS-------------------------------------------
CREATE STREAM CUSTOMERS_STREAM (
schema STRUCT< >,
payload STRUCT<before STRUCT<id INT,first_name VARCHAR,last_name VARCHAR,email VARCHAR>,
after STRUCT<id INT,first_name VARCHAR,last_name VARCHAR,email VARCHAR>,
source STRUCT< >,
op VARCHAR,
ts_ms DOUBLE,
transaction VARCHAR>)
WITH (KAFKA_TOPIC='dbserver1.inventory.customers',VALUE_FORMAT='JSON',PARTITIONS=1);