I’m working on the financial crypto project. I have a Kafka topic which stores the latest prices for crypto assets and a list of predefined combinations of those assets I need to make calculations for. Each combination contains 3 assets, for instance a combination could be something like that: BTCUSDT - LTCUSDT - BTCLTC. I need to perform some calculations using the latest prices for each of those assets using KSQLDB.
To store this list of combinations I use a stream which is created using the following command:
CREATE STREAM IF NOT EXISTS SCHEMAS_STREAM ( SCHEMA_NAME STRING, SCHEMA_ID INTEGER, PURCHASE_CRYPTO_ID INTEGER, PURCHASE_CRYPTO_BASE STRING, PURCHASE_CRYPTO_QUOTE STRING, PURCHASE_CRYPTO_SYMBOL STRING, EXCHANGE_CRYPTO_ID INTEGER, EXCHANGE_CRYPTO_BASE STRING, EXCHANGE_CRYPTO_QUOTE STRING, EXCHANGE_CRYPTO_SYMBOL STRING, TO_STABLE_ID INTEGER, TO_STABLE_BASE STRING, TO_STABLE_QUOTE STRING, TO_STABLE_SYMBOL STRING ) WITH (KAFKA_TOPIC='schemas-topic', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON')
I attache it to the topic called schemas-topic, it does not have any records in it and I populate stream with data via code using INSERT INTO statements.
To store prices I first create a stream from the topic mentioned earlier using following code:
CREATE STREAM IF NOT EXISTS SCHEMAS_STREAM ( UPDATE_AT TIMESTAMP, SYMBOL STRING, ASK_PRICE DOUBLE, BID_PRICE DOUBLE ) WITH (KAFKA_TOPIC='crypto-prices', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON')
Then i create a materialized view (table) to aggregate stream and store latest price for a certain asset:
CREATE TABLE IF NOT EXISTS latest_prices_1 AS SELECT LATEST_BY_OFFSET(update_at) as price_time, LATEST_BY_OFFSET(bid_price) as bid_price, LATEST_BY_OFFSET(ask_price) as ask_price, symbol as symbol FROM PRICES_STREAM GROUP BY symbol EMIT CHANGES
I create 3 fully identical tables because to make calculations for a single combination I need to have 3 prices (referring above example I need to have a price for BTCUSDT, LTCUSDT and BTCLTC) and it’s not possible to use the same table in multiple JOIN clauses inside one query, that is why I had to create duplicate tables. In the code snippet above table has a name as latest_prices_1, other 2 will be called latest_prices_2 and latest_prices_3 respectively.
The actual query is the following:
CREATE STREAM IF NOT EXISTS BINANCE_PROFIT_STREAM WITH (KAFKA_TOPIC='profit-topic', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON') AS SELECT schemas.SCHEMA_NAME, schemas.PURCHASE_CRYPTO_SYMBOL, schemas.EXCHANGE_CRYPTO_SYMBOL, schemas.TO_STABLE_SYMBOL, p1.BID_PRICE, p1.ASK_PRICE, p2.BID_PRICE, p2.ASK_PRICE, p3.ASK_PRICE, p3.BID_PRICE, ((((100 / p1.BID_PRICE) / p2.BID_PRICE) * p3.ASK_PRICE - 100) / 100 * 100) as buy_buy_sell_profit, ((((100 / p1.BID_PRICE) * p2.ASK_PRICE) * p3.ASK_PRICE - 100) / 100 * 100) as buy_sell_sell_profit FROM SCHEMAS_STREAM as schemas JOIN latest_prices_1 p1 ON schemas.PURCHASE_CRYPTO_SYMBOL = p1.symbol JOIN latest_prices_2 p2 ON schemas.EXCHANGE_CRYPTO_SYMBOL = p2.symbol JOIN latest_prices_3 p3 ON schemas.TO_STABLE_SYMBOL = p3.symbol
It returns results, but some fields are null even though they are present in other streams/tables.
My questions are:
- What is wrong with my final query? Why are NULL values returned?
- Is there a better way to achieve what I need to do? I’m not sure that storing a list of combinations in the stream is the best approach since this list won’t be updated. Also creating 3 identical tables also seems more as a workaround for me.
Thanks in advance guys and have a nice day!!!