Hi Team,
I came across the issue, don’t have idea how to resolve it.
Use Case: Create an order system to handle orders for Mobile and Apple devices.
- Receive an order
- Once the order has received the there are 3 different validation needs to happen with the result of a pass and failure.
- Once all the validation results performed, We need to aggregate all the orders which are PASS and have a count of 3. An order that meets the above validation criteria needs to update the order status (from “CREATED” to “VALIDATED”) for further processing.
Topics :
Orders → keep records of all the orders
order-validations → keep records of all the validations that happens on all the orders in the topic Orders. There are 3 type of validation happens so every order will have 3 records in this topic with status PASS or FAIL, and order id.
The solution requires two TOPICS to be created →
- ORDERS → Accept all the order
- ORDER-VALIDATIONS. → Keep all the order validation results
Note: I’m using ksqldb-cli(6.0.0) all in one docker container.
KSQL Processing:
- Have created 2 Streams from both of the topics s_o (from ordres), s_ov (order-validations).
-- Create Stream of Orders topic as s_o
CREATE stream s_o (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
-- Create Stream of Order-validations topic s_ot
CREATE STREAM s_ov (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> ) WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');
- Aggregate the number of count with order status “CREATED” with window tumbling of 60 seconds, and store in a table called t_s_o;
-- Create a table to aggregate count based on validation resuts and group by orderId
CREATE TABLE t_s_ov AS select ORDERVALIDATION->orderid as id, count(*) as total from s_ov window tumbling (size 60 seconds) where ORDERVALIDATION->ORDERVALIDATIONRESULT ='PASS' group by ORDERVALIDATION->orderid emit changes;
- Join the Orders from both s_o and t_s_ov to update the order status to “Validated” once the count is 3 or more (if count is 3 then all the validations are passed)
Since KSQL doesn’t allow join between the persistent table and the nonpersistent stream, I created a stream from the topic “T_S_OV” (Topic is created internally as part of step 2)
CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
- Verify the results from Step 4
Details:
ksql> DROP STREAM IF EXISTS s_t_s_ov;
>CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
DROP STREAM IF EXISTS s_t_s_ov;
Message
------------------------------------------------
Source `S_T_S_OV` (topic: T_S_OV) was dropped.
------------------------------------------------
CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
Message
----------------
Stream created
----------------
ksql> select * from s_t_s_ov emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |TOTAL |
+--------------------------------------------------------+--------------------------------------------------------+
Press CTRL-C to interrupt
Does anyone know what is wrong I’m doing ? or any other possible way to achieve this.
Orders Stream
ksql> describe extended s_o;
Name : S_O
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : orders (partitions: 1, replication: 1)
Statement : CREATE stream s_o (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
;
Field | Type
------------------------------
ID | INTEGER
QUANTITY | INTEGER
PRICE | INTEGER
CUSTOMERID | INTEGER
ORDERSTATE | VARCHAR(STRING)
PRODUCT | VARCHAR(STRING)
------------------------------
Local runtime statistics
------------------------
consumer-messages-per-sec: 0.27 consumer-total-bytes: 6086118 consumer-total-messages: 34341 last-message: 2021-04-26T15:08:40.681Z
consumer-failed-messages: 5802 consumer-failed-messages-per-sec: 0 last-failed: 2021-04-23T04:13:57.008Z
(Statistics of the local KSQL server interaction with the Kafka topic orders)
Order-validations Stream
ksql> describe extended s_ov;
Name : S_OV
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : order-validations (partitions: 1, replication: 1)
Statement : CREATE STREAM s_ov (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> ) WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');
Field | Type
---------------------------------------------------------------------------------------------------------------------
ORDERVALIDATION | STRUCT<ORDERVALIDATIONRESULT VARCHAR(STRING), ORDERVALIDATIONTYPE VARCHAR(STRING), ORDERID INTEGER>
---------------------------------------------------------------------------------------------------------------------
Queries that read from this STREAM
-----------------------------------
CTAS_T_S_OV_1_155 (RUNNING) : CREATE TABLE T_S_OV_1 WITH (KAFKA_TOPIC='T_S_OV_1', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ORDERID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
consumer-messages-per-sec: 1.09 consumer-total-bytes: 10836043 consumer-total-messages: 80796 last-message: 2021-04-27T01:39:21.178Z
consumer-failed-messages: 704 consumer-failed-messages-per-sec: 0 last-failed: 2021-04-26T23:58:47.916Z
(Statistics of the local KSQL server interaction with the Kafka topic order-validations)
order-validation-table Aggregation
ksql> describe extended t_s_ov;
Name : T_S_OV
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : T_S_OV (partitions: 1, replication: 1)
Statement : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT
S_OV.ORDERVALIDATION->ORDERID ORDERID,
COUNT(*) TOTAL
FROM S_OV S_OV
WINDOW TUMBLING ( SIZE 60 SECONDS )
WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS')
GROUP BY S_OV.ORDERVALIDATION->ORDERID
EMIT CHANGES;
Field | Type
------------------------------------------------------------------
ORDERID | INTEGER (primary key) (Window type: TUMBLING)
TOTAL | BIGINT
------------------------------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT S_OV.ORDERVALIDATION->ORDERID ORDERID, COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS ) WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.09 total-messages: 7730 last-message: 2021-04-27T01:40:16.45Z
(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)
Stream from order-validations-table
ksql> describe extended s_t_s_ov;
Name : S_T_S_OV
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : T_S_OV (partitions: 1, replication: 1)
Statement : CREATE STREAM s_t_s_ov (id integer KEY, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
Field | Type
--------------------------------
ID | INTEGER (key)
TOTAL | INTEGER
--------------------------------
Local runtime statistics
------------------------
messages-per-sec: 0.09 total-messages: 7737 last-message: 2021-04-27T01:41:31.567Z
(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)
Notice: Total messages: 7737, However it doesn’t show any results.
I tried another version of Step 3, which is results with null value of orders id.
ksql> CREATE STREAM s_t_s_ov1 (id integer, total integer ) WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
>
Message
----------------
Stream created
----------------
ksql> select * from s_t_s_ov1 emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID |TOTAL |
+--------------------------------------------------------+--------------------------------------------------------+
|null |1 |
|null |1 |
|null |1 |
Limit Reached
Query terminated
Notice I have removed key from the statement.
(cross posted to StackOverflow)