Project a list of records from a stream

Lets say I’m tracking deliveries. Where the message for each delivery is:

input
topic: deliveries
key: deliveryId
value: { driver: ‘John Doe’, status: ‘delivering’, location: ‘latlong’ }

I’d like to have a projection which produces a list of all deliveries with the statuses ‘collecting’ or ‘delivering’:

output
topic: activeDeliveries
value: [
{ deliveryId: 12345, driver: ‘John Doe’, status: ‘delivering’, location: ‘latlong’ },
{ deliveryId: 12346, driver: ‘Jane Doe’, status: ‘collecting’, location: ‘latlong’ }
]

How do I produce this list using the Kafka Streams DSL?

It seems as though I want a table of deliveries. When any one of the deliveries change their status or location it’ll update the table then reproject the list to activeDeliveries.

output
topic: activeDeliveries
value: [
{ deliveryId: 12345, driver: ‘John Doe’, status: ‘delivering’, location: ‘latlong’ },
{ deliveryId: 12346, driver: ‘Jane Doe’, status: ‘delivering’, location: ‘latlong’ }
]

Then when a delivery is complete it’ll be removed from the table. (Will need to use null values to remove)

output
topic: activeDeliveries
value: [
{ deliveryId: 12345, driver: ‘John Doe’, status: ‘delivered’, location: ‘latlong’ }, < will be removed
{ deliveryId: 12346, driver: ‘Jane Doe’, status: ‘delivering’, location: ‘latlong’ }
]

In other words - can I just output a stream of the whole table when any of the rows change?

A note: I’d be using a GlobalKTable, which does not have .to or .toStream methods, where an ordinary KTable would have done the trick.

This feels like a simple use case, but with further research it seems I might need to dig down into the processor API to create a custom Processor.

A projection removes fields/columns… Sound like you want a filter? Not sure what you mean by “list of all deliveries”? The output example is not totally clear to me – but it seems you want to actually aggregate individual deliveries into a single record? (Just wondering why?)

Reading further, it seems you want to apply a filter and just store each record in a table (ie, no aggregation). For this case, the key would be preserved.

For the global-KTable question: a GlobalKTable holds read-only data, thus, you will need to do a two step process, first apply the filter and write the result back into a topic. Second, read the topic into a GlobalKTable.

Not sure why you need/want a GlobalKTable though.

For the filter step, you would need to read the data as KTable and apply the filter, to make sure you get tombstones if records should be removed. If you would process the data a KStream, you could do a mapValue() though, that emits a null value if something should be dropped.