KSQLDB - Stream doesn't produce any result with TOPIC Key

Hi Team,
I came across the issue, don’t have idea how to resolve it.

Use Case: Create an order system to handle orders for Mobile and Apple devices.

  1. Receive an order
  2. Once the order has received the there are 3 different validation needs to happen with the result of a pass and failure.
  3. Once all the validation results performed, We need to aggregate all the orders which are PASS and have a count of 3. An order that meets the above validation criteria needs to update the order status (from “CREATED” to “VALIDATED”) for further processing.

Topics :
Orders → keep records of all the orders
order-validations → keep records of all the validations that happens on all the orders in the topic Orders. There are 3 type of validation happens so every order will have 3 records in this topic with status PASS or FAIL, and order id.

The solution requires two TOPICS to be created →

  1. ORDERS → Accept all the order
  2. ORDER-VALIDATIONS. → Keep all the order validation results

Note: I’m using ksqldb-cli(6.0.0) all in one docker container.

KSQL Processing:

  1. Have created 2 Streams from both of the topics s_o (from ordres), s_ov (order-validations).
-- Create Stream of Orders topic as s_o
CREATE stream s_o  (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar )  WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
-- Create Stream of Order-validations topic  s_ot

CREATE STREAM s_ov  (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> )  WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON'); 
  1. Aggregate the number of count with order status “CREATED” with window tumbling of 60 seconds, and store in a table called t_s_o;
-- Create a table to aggregate count based on validation resuts and group by orderId
CREATE TABLE t_s_ov  AS select ORDERVALIDATION->orderid as id,  count(*) as total  from s_ov  window tumbling (size 60 seconds)   where ORDERVALIDATION->ORDERVALIDATIONRESULT ='PASS'   group by ORDERVALIDATION->orderid emit changes;
  1. Join the Orders from both s_o and t_s_ov to update the order status to “Validated” once the count is 3 or more (if count is 3 then all the validations are passed)

Since KSQL doesn’t allow join between the persistent table and the nonpersistent stream, I created a stream from the topic “T_S_OV” (Topic is created internally as part of step 2)

CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
  1. Verify the results from Step 4



Details:



ksql> DROP STREAM IF EXISTS s_t_s_ov;
>CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;

DROP STREAM IF EXISTS s_t_s_ov;
 Message
------------------------------------------------
 Source `S_T_S_OV` (topic: T_S_OV) was dropped.
------------------------------------------------

CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
 Message
----------------
 Stream created
----------------
ksql> select * from s_t_s_ov emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |TOTAL                                                   |
+--------------------------------------------------------+--------------------------------------------------------+


Press CTRL-C to interrupt

Does anyone know what is wrong I’m doing ? or any other possible way to achieve this.

Orders Stream

ksql> describe extended s_o;

Name                 : S_O
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : orders (partitions: 1, replication: 1)
Statement            : CREATE stream s_o  (id integer , quantity integer,price integer,customerId integer,OrderState varchar, product varchar )  WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON')
;

 Field      | Type
------------------------------
 ID         | INTEGER
 QUANTITY   | INTEGER
 PRICE      | INTEGER
 CUSTOMERID | INTEGER
 ORDERSTATE | VARCHAR(STRING)
 PRODUCT    | VARCHAR(STRING)
------------------------------

Local runtime statistics
------------------------
consumer-messages-per-sec:      0.27 consumer-total-bytes:   6086118 consumer-total-messages:     34341     last-message: 2021-04-26T15:08:40.681Z
consumer-failed-messages:      5802 consumer-failed-messages-per-sec:         0      last-failed: 2021-04-23T04:13:57.008Z
(Statistics of the local KSQL server interaction with the Kafka topic orders)

Order-validations Stream

ksql> describe extended s_ov;

Name                 : S_OV
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : order-validations (partitions: 1, replication: 1)
Statement            : CREATE STREAM s_ov  (OrderValidation STRUCT <OrderValidationResult VARCHAR, OrderValidationType VARCHAR,orderId integer> )  WITH (KAFKA_TOPIC='order-validations', VALUE_FORMAT='JSON');

 Field           | Type                                                                                              
---------------------------------------------------------------------------------------------------------------------
 ORDERVALIDATION | STRUCT<ORDERVALIDATIONRESULT VARCHAR(STRING), ORDERVALIDATIONTYPE VARCHAR(STRING), ORDERID INTEGER>
---------------------------------------------------------------------------------------------------------------------

Queries that read from this STREAM
-----------------------------------
CTAS_T_S_OV_1_155 (RUNNING) : CREATE TABLE T_S_OV_1 WITH (KAFKA_TOPIC='T_S_OV_1', PARTITIONS=1, REPLICAS=1) AS SELECT   S_OV.ORDERVALIDATION->ORDERID ID,   COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS )  WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT   S_OV.ORDERVALIDATION->ORDERID ORDERID,   COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS )  WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
consumer-messages-per-sec:      1.09 consumer-total-bytes:  10836043 consumer-total-messages:     80796     last-message: 2021-04-27T01:39:21.178Z
consumer-failed-messages:       704 consumer-failed-messages-per-sec:         0      last-failed: 2021-04-26T23:58:47.916Z
(Statistics of the local KSQL server interaction with the Kafka topic order-validations)

order-validation-table Aggregation

ksql> describe extended t_s_ov;

Name                 : T_S_OV
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : T_S_OV (partitions: 1, replication: 1)
Statement            : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT
  S_OV.ORDERVALIDATION->ORDERID ORDERID,
  COUNT(*) TOTAL
FROM S_OV S_OV
WINDOW TUMBLING ( SIZE 60 SECONDS )
WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS')
GROUP BY S_OV.ORDERVALIDATION->ORDERID
EMIT CHANGES;

 Field   | Type
------------------------------------------------------------------
 ORDERID | INTEGER          (primary key) (Window type: TUMBLING)
 TOTAL   | BIGINT
------------------------------------------------------------------

Queries that write from this TABLE
-----------------------------------
CTAS_T_S_OV_85 (RUNNING) : CREATE TABLE T_S_OV WITH (KAFKA_TOPIC='T_S_OV', PARTITIONS=1, REPLICAS=1) AS SELECT   S_OV.ORDERVALIDATION->ORDERID ORDERID,   COUNT(*) TOTAL FROM S_OV S_OV WINDOW TUMBLING ( SIZE 60 SECONDS )  WHERE (S_OV.ORDERVALIDATION->ORDERVALIDATIONRESULT = 'PASS') GROUP BY S_OV.ORDERVALIDATION->ORDERID EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.09   total-messages:      7730     last-message: 2021-04-27T01:40:16.45Z

(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)

Stream from order-validations-table

ksql> describe extended s_t_s_ov;

Name                 : S_T_S_OV
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : T_S_OV (partitions: 1, replication: 1)
Statement            : CREATE STREAM s_t_s_ov  (id integer KEY, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;

 Field | Type
--------------------------------
 ID    | INTEGER          (key)
 TOTAL | INTEGER
--------------------------------

Local runtime statistics
------------------------
messages-per-sec:      0.09   total-messages:      7737     last-message: 2021-04-27T01:41:31.567Z

(Statistics of the local KSQL server interaction with the Kafka topic T_S_OV)

Notice: Total messages: 7737, However it doesn’t show any results.

I tried another version of Step 3, which is results with null value of orders id.


ksql> CREATE STREAM s_t_s_ov1  (id integer, total integer )  WITH (KAFKA_TOPIC='T_S_OV',VALUE_FORMAT='JSON') ;
>

 Message
----------------
 Stream created
----------------
ksql> select * from s_t_s_ov1 emit changes limit 3;
+--------------------------------------------------------+--------------------------------------------------------+
|ID                                                      |TOTAL                                                   |
+--------------------------------------------------------+--------------------------------------------------------+
|null                                                    |1                                                       |
|null                                                    |1                                                       |
|null                                                    |1                                                       |
Limit Reached
Query terminated

Notice I have removed key from the statement.

(cross posted to StackOverflow)