Unexpected Results from Stream-Table Join

I’m having some trouble understanding the results I’m getting when joining a table and a stream. I have a stream defined like this:

CREATE STREAM PositionUpdatesStream
(
    ROWKEY STRING KEY, 
    TradeKey STRING,
    ActionKey STRING,
    BatchType STRING,
    EodId STRING,
    EodDate STRING,
    Position STRUCT<
        PositionKey STRUCT<
            ProductId INT,
            ContractId INT,
            AccountId INT,
            Strategy STRING,
            CurrencyCode STRING
        >,
        Bod STRUCT<
            Quantity decimal(19,4),
            CostBasis decimal(19,4)
        >,
        DaySell STRUCT<
            Quantity decimal(19,4),
            CostBasis decimal(19,4)
        >,
        DayBuy STRUCT<
            Quantity decimal(19,4),
            CostBasis decimal(19,4)
        >,
        PrincipalActivity STRUCT<
            Quantity decimal(19,4),
            CostBasis decimal(19,4)
        >,
        Total STRUCT<
            Quantity decimal(19,4),
            CostBasis decimal(19,4)
        >,
        SettleCurrency STRING,
        StaticMarketValueBod decimal(19,4),
        StaticMarketValueBodBase decimal(19,4),
        StaticCostBasisBodBase decimal(19,4)
   >,
   CashBalance STRUCT<
        CashBalanceKey STRUCT<
            AccountId INT,
            CurrencyCode STRING
        >,
        Bod STRUCT<
            TradeDateAmount decimal(19,4),
            SettleDateAmount decimal(19,4)
        >,
        Intraday STRUCT<
            TradeDateAmount decimal(19,4),
            SettleDateAmount decimal(19,4)
        >,
        PrincipalActivity STRUCT<
            TradeDateAmount decimal(19,4),
            SettleDateAmount decimal(19,4)
        >,
        Total STRUCT<
            TradeDateAmount decimal(19,4),
            SettleDateAmount decimal(19,4)
        >
    >
) WITH (kafka_topic='bxc.ibor.position.updates', value_format='JSON');

Then I create a table that sums two numeric values, grouped by CURRENCYCODE

CREATE TABLE POSITIONAGGBYCURRENCY WITH (KAFKA_TOPIC='POSITIONAGGBYCURRENCY', PARTITIONS=1, REPLICAS=1) AS SELECT
  POSITIONUPDATESSTREAM.POSITION->POSITIONKEY->CURRENCYCODE CURRENCYCODE,
  COUNT(*) POSITIONCOUNT,
  SUM(POSITIONUPDATESSTREAM.POSITION->TOTAL->QUANTITY) TOTALQUANTITY,
  SUM(POSITIONUPDATESSTREAM.POSITION->TOTAL->COSTBASIS) TOTALCOST
FROM POSITIONUPDATESSTREAM POSITIONUPDATESSTREAM
GROUP BY POSITIONUPDATESSTREAM.POSITION->POSITIONKEY->CURRENCYCODE
EMIT CHANGES;

If I query this table (without emit changes) I get what I expect:

ksql> select CurrencyCode, PositionCount from PositionAggByCurrency;
+-------------------------------------------------------------------------+-------------
|CURRENCYCODE                                                             |POSITIONCOUNT
+-------------------------------------------------------------------------+--------------
|AUD                                                                      |12
|CAD                                                                      |10
|CHF                                                                      |1
|CZK                                                                      |1
|DKK                                                                      |2
|EUR                                                                      |9147
|GBP                                                                      |331
|JPY                                                                      |5
|NOK                                                                      |1
|USD                                                                      |95617
Query terminated
ksql>

My question arises when I try to join this table with the original stream (with set ‘auto.offset.reset’=‘eariest’) on the CurrencyCode. I expected all the stream values where the currency is ‘USD’ (for example) to be output with a count of 95,617. But I see the value of the PositionCount column change periodically as rows are displayed from this query:

ksql> select pu.ROWKEY, pu.Position->PositionKey->CurrencyCode, pac.POSITIONCOUNT
>from PositionUpdatesStream pu
>join PositionAggByCurrency pac on pu.Position->PositionKey->CurrencyCode = pac.CURRENCYCODE
>where pu.ROWKEY like 'P%' emit changes;

...
|P-142433--USD-30468-Default                     |USD                                             |55035                                           |
|P-142433--USD-30534-Default                     |USD                                             |55035                                           |

...

|P-166626--USD-32513-Default                     |USD                                             |65115                                           |
|P-166630--USD-16927-Default                     |USD                                             |65115                                           |

...

|P-228390--USD-31538-Default                     |USD                                             |95609                                           |
|P-228392--USD-16910-Default                     |USD                                             |95610                                           |
|P-228392--USD-24055-Default                     |USD                                             |95611                                           |
|P-228392--USD-24056-Default                     |USD                                             |95612                                           |
|P-228392--USD-31150-Default                     |USD                                             |95613                                           |
|P-228392--USD-32178-Default                     |USD                                             |95614                                           |
|P-228393--USD-16910-Default                     |USD                                             |95615                                           |
|P-228393--USD-24055-Default                     |USD                                             |95616                                           |
|P-228393--USD-24056-Default                     |USD                                             |95617                                           |
|P-228393--USD-31150-Default                     |USD                                             |95618                                           |
|P-228401--USD-32513-Default                     |USD                                             |95619                                           |
|P-228415--USD-31538-Default                     |USD                                             |95620                                           |
|P-124098--USD-16976-Default                     |USD                                             |95621                                           |
|P-122997--USD-31206-Default                     |USD                                             |95559                                           |

Press CTRL-C to interrupt

As I mentioned, my expectation is that the joined table would output 95,617 as the PositionCount for every emited row where the currency is ‘USD’. As the values of PositionCount appear to be changing as the joined results are output, I’m guessing it is joining against the table’s backing changelog topic rather than the RocksDb materialized table. Finally, the last row produced seems strange as the PositionCount values seemed to increase (or stay the same) until the very last row? I must be missing something basic here. I appreciate any help.

Correct, when you query the table to verify the aggregation result, RocksDB is used. This verification uses the “now” version of the table. However, the downstream join reads the data from the topic and replays the history of the table.

Note that stream-table joins are inherently temporal; thus, the table is not “bootstrapped” but the table is kept “time synchronized” with your stream (if we would “bootstrap” the table, we would move the table into the future and compute the incorrect joint): If you start with an empty table, and an empty stream, and you put a record into the stream it will join to the empty table. If you next update the table, followed by a second stream record, the second stream record will join to a table with one row. – The underlying principle is, that if you stop your query, and re-run in (with auto-offset “latest”) you want the same result – you don’t want the first stream record to see anything in the table (the table was updated after the first stream record).

The stream-table join you do does the same thing. It considers when the aggregation was computed, and aligns when to join a stream record to the table based on the stream- and table-record timestamps.

Cf Temporal-Joins in Kafka Streams and ksqlDB | Matthias Sax, Confluent