NULL values are retuned after JOIN in ksqlDB

Hey everyone! :wave:
I’m working on the financial crypto project. I have a Kafka topic which stores the latest prices for crypto assets and a list of predefined combinations of those assets I need to make calculations for. Each combination contains 3 assets, for instance a combination could be something like that: BTCUSDT - LTCUSDT - BTCLTC. I need to perform some calculations using the latest prices for each of those assets using KSQLDB.
To store this list of combinations I use a stream which is created using the following command:

CREATE STREAM IF NOT EXISTS SCHEMAS_STREAM (
    SCHEMA_NAME STRING, SCHEMA_ID INTEGER, PURCHASE_CRYPTO_ID INTEGER, PURCHASE_CRYPTO_BASE STRING, PURCHASE_CRYPTO_QUOTE STRING,
    PURCHASE_CRYPTO_SYMBOL STRING, EXCHANGE_CRYPTO_ID INTEGER, EXCHANGE_CRYPTO_BASE STRING, EXCHANGE_CRYPTO_QUOTE STRING,
    EXCHANGE_CRYPTO_SYMBOL STRING, TO_STABLE_ID INTEGER, TO_STABLE_BASE STRING, TO_STABLE_QUOTE STRING,
    TO_STABLE_SYMBOL STRING
    ) WITH (KAFKA_TOPIC='schemas-topic', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON')

I attache it to the topic called schemas-topic, it does not have any records in it and I populate stream with data via code using INSERT INTO statements.
To store prices I first create a stream from the topic mentioned earlier using following code:

CREATE STREAM IF NOT EXISTS SCHEMAS_STREAM (
        UPDATE_AT TIMESTAMP, SYMBOL STRING, ASK_PRICE DOUBLE, BID_PRICE DOUBLE
        ) WITH (KAFKA_TOPIC='crypto-prices', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON')

Then i create a materialized view (table) to aggregate stream and store latest price for a certain asset:

CREATE TABLE IF NOT EXISTS latest_prices_1 AS
    SELECT 
        LATEST_BY_OFFSET(update_at) as price_time,
        LATEST_BY_OFFSET(bid_price) as bid_price,
        LATEST_BY_OFFSET(ask_price) as ask_price,
        symbol as symbol
    FROM PRICES_STREAM
    GROUP BY symbol
    EMIT CHANGES

I create 3 fully identical tables because to make calculations for a single combination I need to have 3 prices (referring above example I need to have a price for BTCUSDT, LTCUSDT and BTCLTC) and it’s not possible to use the same table in multiple JOIN clauses inside one query, that is why I had to create duplicate tables. In the code snippet above table has a name as latest_prices_1, other 2 will be called latest_prices_2 and latest_prices_3 respectively.
The actual query is the following:

CREATE STREAM IF NOT EXISTS BINANCE_PROFIT_STREAM WITH (KAFKA_TOPIC='profit-topic', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON') AS SELECT
    schemas.SCHEMA_NAME, 
    schemas.PURCHASE_CRYPTO_SYMBOL,
    schemas.EXCHANGE_CRYPTO_SYMBOL,
    schemas.TO_STABLE_SYMBOL,
    p1.BID_PRICE,
    p1.ASK_PRICE,
    p2.BID_PRICE,
    p2.ASK_PRICE,
    p3.ASK_PRICE,
    p3.BID_PRICE,
    ((((100 / p1.BID_PRICE) / p2.BID_PRICE) * p3.ASK_PRICE - 100) / 100 * 100) as buy_buy_sell_profit,
    ((((100 / p1.BID_PRICE) * p2.ASK_PRICE) * p3.ASK_PRICE - 100) / 100 * 100) as buy_sell_sell_profit
    FROM
        SCHEMAS_STREAM as schemas
        JOIN latest_prices_1 p1 ON schemas.PURCHASE_CRYPTO_SYMBOL = p1.symbol
        JOIN latest_prices_2 p2 ON schemas.EXCHANGE_CRYPTO_SYMBOL = p2.symbol
        JOIN latest_prices_3 p3 ON schemas.TO_STABLE_SYMBOL = p3.symbol

It returns results, but some fields are null even though they are present in other streams/tables.
My questions are:

  1. What is wrong with my final query? Why are NULL values returned?
  2. Is there a better way to achieve what I need to do? I’m not sure that storing a list of combinations in the stream is the best approach since this list won’t be updated. Also creating 3 identical tables also seems more as a workaround for me.
    Thanks in advance guys and have a nice day!!! :slightly_smiling_face:

This topic was automatically closed after 30 days. New replies are no longer allowed.