Hey everyone!
I’m working on the financial crypto project. I have a Kafka topic which stores the latest prices for crypto assets and a list of predefined combinations of those assets I need to make calculations for. Each combination contains 3 assets, for instance a combination could be something like that: BTCUSDT - LTCUSDT - BTCLTC. I need to perform some calculations using the latest prices for each of those assets using KSQLDB.
To store this list of combinations I use a stream which is created using the following command:
CREATE STREAM IF NOT EXISTS SCHEMAS_STREAM (
SCHEMA_NAME STRING, SCHEMA_ID INTEGER, PURCHASE_CRYPTO_ID INTEGER, PURCHASE_CRYPTO_BASE STRING, PURCHASE_CRYPTO_QUOTE STRING,
PURCHASE_CRYPTO_SYMBOL STRING, EXCHANGE_CRYPTO_ID INTEGER, EXCHANGE_CRYPTO_BASE STRING, EXCHANGE_CRYPTO_QUOTE STRING,
EXCHANGE_CRYPTO_SYMBOL STRING, TO_STABLE_ID INTEGER, TO_STABLE_BASE STRING, TO_STABLE_QUOTE STRING,
TO_STABLE_SYMBOL STRING
) WITH (KAFKA_TOPIC='schemas-topic', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON')
I attache it to the topic called schemas-topic, it does not have any records in it and I populate stream with data via code using INSERT INTO statements.
To store prices I first create a stream from the topic mentioned earlier using following code:
CREATE STREAM IF NOT EXISTS SCHEMAS_STREAM (
UPDATE_AT TIMESTAMP, SYMBOL STRING, ASK_PRICE DOUBLE, BID_PRICE DOUBLE
) WITH (KAFKA_TOPIC='crypto-prices', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON')
Then i create a materialized view (table) to aggregate stream and store latest price for a certain asset:
CREATE TABLE IF NOT EXISTS latest_prices_1 AS
SELECT
LATEST_BY_OFFSET(update_at) as price_time,
LATEST_BY_OFFSET(bid_price) as bid_price,
LATEST_BY_OFFSET(ask_price) as ask_price,
symbol as symbol
FROM PRICES_STREAM
GROUP BY symbol
EMIT CHANGES
I create 3 fully identical tables because to make calculations for a single combination I need to have 3 prices (referring above example I need to have a price for BTCUSDT, LTCUSDT and BTCLTC) and it’s not possible to use the same table in multiple JOIN clauses inside one query, that is why I had to create duplicate tables. In the code snippet above table has a name as latest_prices_1, other 2 will be called latest_prices_2 and latest_prices_3 respectively.
The actual query is the following:
CREATE STREAM IF NOT EXISTS BINANCE_PROFIT_STREAM WITH (KAFKA_TOPIC='profit-topic', KEY_FORMAT='KAFKA', PARTITIONS=10, VALUE_FORMAT='JSON') AS SELECT
schemas.SCHEMA_NAME,
schemas.PURCHASE_CRYPTO_SYMBOL,
schemas.EXCHANGE_CRYPTO_SYMBOL,
schemas.TO_STABLE_SYMBOL,
p1.BID_PRICE,
p1.ASK_PRICE,
p2.BID_PRICE,
p2.ASK_PRICE,
p3.ASK_PRICE,
p3.BID_PRICE,
((((100 / p1.BID_PRICE) / p2.BID_PRICE) * p3.ASK_PRICE - 100) / 100 * 100) as buy_buy_sell_profit,
((((100 / p1.BID_PRICE) * p2.ASK_PRICE) * p3.ASK_PRICE - 100) / 100 * 100) as buy_sell_sell_profit
FROM
SCHEMAS_STREAM as schemas
JOIN latest_prices_1 p1 ON schemas.PURCHASE_CRYPTO_SYMBOL = p1.symbol
JOIN latest_prices_2 p2 ON schemas.EXCHANGE_CRYPTO_SYMBOL = p2.symbol
JOIN latest_prices_3 p3 ON schemas.TO_STABLE_SYMBOL = p3.symbol
It returns results, but some fields are null even though they are present in other streams/tables.
My questions are:
- What is wrong with my final query? Why are NULL values returned?
- Is there a better way to achieve what I need to do? I’m not sure that storing a list of combinations in the stream is the best approach since this list won’t be updated. Also creating 3 identical tables also seems more as a workaround for me.
Thanks in advance guys and have a nice day!!!