I’m having some trouble understanding the results I’m getting when joining a table and a stream. I have a stream defined like this:
CREATE STREAM PositionUpdatesStream
(
ROWKEY STRING KEY,
TradeKey STRING,
ActionKey STRING,
BatchType STRING,
EodId STRING,
EodDate STRING,
Position STRUCT<
PositionKey STRUCT<
ProductId INT,
ContractId INT,
AccountId INT,
Strategy STRING,
CurrencyCode STRING
>,
Bod STRUCT<
Quantity decimal(19,4),
CostBasis decimal(19,4)
>,
DaySell STRUCT<
Quantity decimal(19,4),
CostBasis decimal(19,4)
>,
DayBuy STRUCT<
Quantity decimal(19,4),
CostBasis decimal(19,4)
>,
PrincipalActivity STRUCT<
Quantity decimal(19,4),
CostBasis decimal(19,4)
>,
Total STRUCT<
Quantity decimal(19,4),
CostBasis decimal(19,4)
>,
SettleCurrency STRING,
StaticMarketValueBod decimal(19,4),
StaticMarketValueBodBase decimal(19,4),
StaticCostBasisBodBase decimal(19,4)
>,
CashBalance STRUCT<
CashBalanceKey STRUCT<
AccountId INT,
CurrencyCode STRING
>,
Bod STRUCT<
TradeDateAmount decimal(19,4),
SettleDateAmount decimal(19,4)
>,
Intraday STRUCT<
TradeDateAmount decimal(19,4),
SettleDateAmount decimal(19,4)
>,
PrincipalActivity STRUCT<
TradeDateAmount decimal(19,4),
SettleDateAmount decimal(19,4)
>,
Total STRUCT<
TradeDateAmount decimal(19,4),
SettleDateAmount decimal(19,4)
>
>
) WITH (kafka_topic='bxc.ibor.position.updates', value_format='JSON');
Then I create a table that sums two numeric values, grouped by CURRENCYCODE
CREATE TABLE POSITIONAGGBYCURRENCY WITH (KAFKA_TOPIC='POSITIONAGGBYCURRENCY', PARTITIONS=1, REPLICAS=1) AS SELECT
POSITIONUPDATESSTREAM.POSITION->POSITIONKEY->CURRENCYCODE CURRENCYCODE,
COUNT(*) POSITIONCOUNT,
SUM(POSITIONUPDATESSTREAM.POSITION->TOTAL->QUANTITY) TOTALQUANTITY,
SUM(POSITIONUPDATESSTREAM.POSITION->TOTAL->COSTBASIS) TOTALCOST
FROM POSITIONUPDATESSTREAM POSITIONUPDATESSTREAM
GROUP BY POSITIONUPDATESSTREAM.POSITION->POSITIONKEY->CURRENCYCODE
EMIT CHANGES;
If I query this table (without emit changes) I get what I expect:
ksql> select CurrencyCode, PositionCount from PositionAggByCurrency;
+-------------------------------------------------------------------------+-------------
|CURRENCYCODE |POSITIONCOUNT
+-------------------------------------------------------------------------+--------------
|AUD |12
|CAD |10
|CHF |1
|CZK |1
|DKK |2
|EUR |9147
|GBP |331
|JPY |5
|NOK |1
|USD |95617
Query terminated
ksql>
My question arises when I try to join this table with the original stream (with set ‘auto.offset.reset’=‘eariest’) on the CurrencyCode. I expected all the stream values where the currency is ‘USD’ (for example) to be output with a count of 95,617. But I see the value of the PositionCount column change periodically as rows are displayed from this query:
ksql> select pu.ROWKEY, pu.Position->PositionKey->CurrencyCode, pac.POSITIONCOUNT
>from PositionUpdatesStream pu
>join PositionAggByCurrency pac on pu.Position->PositionKey->CurrencyCode = pac.CURRENCYCODE
>where pu.ROWKEY like 'P%' emit changes;
...
|P-142433--USD-30468-Default |USD |55035 |
|P-142433--USD-30534-Default |USD |55035 |
...
|P-166626--USD-32513-Default |USD |65115 |
|P-166630--USD-16927-Default |USD |65115 |
...
|P-228390--USD-31538-Default |USD |95609 |
|P-228392--USD-16910-Default |USD |95610 |
|P-228392--USD-24055-Default |USD |95611 |
|P-228392--USD-24056-Default |USD |95612 |
|P-228392--USD-31150-Default |USD |95613 |
|P-228392--USD-32178-Default |USD |95614 |
|P-228393--USD-16910-Default |USD |95615 |
|P-228393--USD-24055-Default |USD |95616 |
|P-228393--USD-24056-Default |USD |95617 |
|P-228393--USD-31150-Default |USD |95618 |
|P-228401--USD-32513-Default |USD |95619 |
|P-228415--USD-31538-Default |USD |95620 |
|P-124098--USD-16976-Default |USD |95621 |
|P-122997--USD-31206-Default |USD |95559 |
Press CTRL-C to interrupt
As I mentioned, my expectation is that the joined table would output 95,617 as the PositionCount for every emited row where the currency is ‘USD’. As the values of PositionCount appear to be changing as the joined results are output, I’m guessing it is joining against the table’s backing changelog topic rather than the RocksDb materialized table. Finally, the last row produced seems strange as the PositionCount values seemed to increase (or stay the same) until the very last row? I must be missing something basic here. I appreciate any help.