WITHIN Feild in KSQLDB

I have two streams
one is CUSTOMERS_STREAM with fields- CUSTOMER_ID, CUSTOMER_FIRST_NAME, CUSTOMER_LAST_NAME, CUSTOMER_EMAIL from topic dbserver1.inventory.customers
and another with ORDERS_STREAM with feilds- ORDER_ID, ORDER_DATE, PURCHASER,ORDER_QUANTIY, PRODUCT_ID. from topic dbserver1.inventory.orders

Here the relation between the topics is purchaser is an id of the customer .

So, to merge two streams my query goes like this

SELECT
    o.PAYLOAD->AFTER->id AS ORDER_ID,
    o.PAYLOAD->AFTER->order_date AS ORDER_DATE,
    c.PAYLOAD->AFTER->id AS CUSTOMER_ID,
    c.PAYLOAD->AFTER->first_name AS CUSTOMER_FIRST_NAME,
    c.PAYLOAD->AFTER->last_name AS CUSTOMER_LAST_NAME,
    c.PAYLOAD->AFTER->email AS CUSTOMER_EMAIL,
    o.PAYLOAD->AFTER->quantity AS ORDER_QUANTIY,
    o.PAYLOAD->AFTER->product_id AS PRODUCT_ID
FROM
    CUSTOMERS_STREAM c
JOIN
    ORDERS_STREAM o
ON
    c.PAYLOAD->AFTER->id = o.PAYLOAD->AFTER->purchaser
EMIT CHANGES;

and I got an Error
Stream-Stream joins must have a WITHIN clause specified. None was provided. To learn about how to specify a WITHIN clause with a stream-stream join, please visit: https://docs.confluent.io/current/ksql/docs/syntax-reference.html#create-stream-as-select

Upon Searching I got to know that WITHIN is Mandatory Feild, There comes the issue
I am having 18 records as of now ( order id 10005 is missing) in database for orders

if I write

SELECT
    o.PAYLOAD->AFTER->id AS ORDER_ID,
    o.PAYLOAD->AFTER->order_date AS ORDER_DATE,
    c.PAYLOAD->AFTER->id AS CUSTOMER_ID,
    c.PAYLOAD->AFTER->first_name AS CUSTOMER_FIRST_NAME,
    c.PAYLOAD->AFTER->last_name AS CUSTOMER_LAST_NAME,
    c.PAYLOAD->AFTER->email AS CUSTOMER_EMAIL,
    o.PAYLOAD->AFTER->quantity AS ORDER_QUANTIY,
    o.PAYLOAD->AFTER->product_id AS PRODUCT_ID
FROM
    CUSTOMERS_STREAM c
JOIN
    ORDERS_STREAM o WITHIN 5 MINUTE
ON
    c.PAYLOAD->AFTER->id = o.PAYLOAD->AFTER->purchaser
EMIT CHANGES;

this It returned me only top 4 records

+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_ID               |ORDER_DATE             |CUSTOMER_ID            |CUSTOMER_FIRST_NAME    |CUSTOMER_LAST_NAME     |CUSTOMER_EMAIL         |ORDER_QUANTIY          |PRODUCT_ID             |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|10001                  |16816                  |1001                   |Sally                  |Thomas                 |sally.thomas@acme.com  |1                      |102                    |
|10002                  |16817                  |1002                   |George                 |Bailey                 |gbailey@foobar.com     |2                      |105                    |
|10003                  |16850                  |1002                   |George                 |Bailey                 |gbailey@foobar.com     |2                      |106                    |
|10004                  |16852                  |1003                   |Edward                 |Walker                 |ed@walker.com          |1                      |107                    |

If i wrote WITHIN 5 HOUR
i got

+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_ID               |ORDER_DATE             |CUSTOMER_ID            |CUSTOMER_FIRST_NAME    |CUSTOMER_LAST_NAME     |CUSTOMER_EMAIL         |ORDER_QUANTIY          |PRODUCT_ID             |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|10001                  |16816                  |1001                   |Sally                  |Thomas                 |sally.thomas@acme.com  |1                      |102                    |
|10002                  |16817                  |1002                   |George                 |Bailey                 |gbailey@foobar.com     |2                      |105                    |
|10003                  |16850                  |1002                   |George                 |Bailey                 |gbailey@foobar.com     |2                      |106                    |
|10004                  |16852                  |1003                   |Edward                 |Walker                 |ed@walker.com          |1                      |107                    |
|10007                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |107                    |
|10008                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |107                    |
|10009                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |107                    |
|10010                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |106                    |

Here you can see 10006 is missing i don’t know why

If i write WITHIN 30 DAY

I got all records

+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ORDER_ID               |ORDER_DATE             |CUSTOMER_ID            |CUSTOMER_FIRST_NAME    |CUSTOMER_LAST_NAME     |CUSTOMER_EMAIL         |ORDER_QUANTIY          |PRODUCT_ID             |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|10001                  |16816                  |1001                   |Sally                  |Thomas                 |sally.thomas@acme.com  |1                      |102                    |
|10002                  |16817                  |1002                   |George                 |Bailey                 |gbailey@foobar.com     |2                      |105                    |
|10003                  |16850                  |1002                   |George                 |Bailey                 |gbailey@foobar.com     |2                      |106                    |
|10004                  |16852                  |1003                   |Edward                 |Walker                 |ed@walker.com          |1                      |107                    |
|10006                  |19444                  |1001                   |Sally                  |Thomas                 |sally.thomas@acme.com  |5                      |104                    |
|10007                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |107                    |
|10008                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |107                    |
|10009                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |107                    |
|10010                  |19445                  |1007                   |Eswar Sai              |Kalipalli              |eswar@gmail.com        |10                     |106                    |
|10011                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10012                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10013                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10014                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10015                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10016                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10017                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10018                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |
|10019                  |19445                  |1006                   |likhith                |kanigolla              |likhitht@gmail.com     |10                     |106                    |

I don’t know about this strange behaviour i am totally confused one more question is ,

Is it mandatory to have the timestamp coloum in streams to merge?
And I wanted to know it is because of the data or with the configuration or something else

Commands I Used is

----------------------------ORDERS-------------------------------------------
CREATE STREAM ORDERS_STREAM (
         schema STRUCT< >,
              payload STRUCT<before STRUCT<id INT,order_date INT,purchaser INT,quantity INT,product_id INT>,
              after STRUCT<id INT,order_date INT,purchaser INT,quantity INT,product_id INT>,
              source STRUCT< >,
              op VARCHAR,
              ts_ms DOUBLE,
              transaction VARCHAR>)
WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='JSON',PARTITIONS=1);

--------------------------CUSTOMERS-------------------------------------------
CREATE STREAM CUSTOMERS_STREAM (
	schema STRUCT< >,
		payload STRUCT<before STRUCT<id INT,first_name VARCHAR,last_name VARCHAR,email VARCHAR>,
			after STRUCT<id INT,first_name VARCHAR,last_name VARCHAR,email VARCHAR>,
			source STRUCT< >,
			op VARCHAR,
			ts_ms DOUBLE,
			transaction VARCHAR>)
WITH (KAFKA_TOPIC='dbserver1.inventory.customers',VALUE_FORMAT='JSON',PARTITIONS=1);

@dtroiano can you just look into this one too?

So, to merge two streams my query goes like this

You say “merge” – sounds like UNION to me, not like a JOIN?

A stream-stream join is using a “time window” that you specify using the WITHIN clause to join all record that happen temporary close to each other. (Cf https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/)

Also, if you say JOIN you specify an inner-join, ie, records with no join partner on the other side within the window are not going into the result. Maybe a [FULL] OUTER JOIN would help (if a join is really what you want).

I guess the main question is: what result do you want to compute, and we can work backwards from there.

This topic was automatically closed after 30 days. New replies are no longer allowed.