Message headers and state stores

Hi all :slightly_smiling_face:

We’re currently in the process of applying kafka headers for traceability in our projects. However, we’re currently struggling with keeping headers while storing avro records in state stores. Would anyone have some pointers on how we could achieve this? Any input at this point is highly appreciated.

Rough draft of the application in mind:

// Topology example (pseudocode):
val builder = StreamsBuilder()

builder.addStateStore(
    Stores.timestampedKeyValueStoreBuilder(
        Stores.persistentTimestampedKeyValueStore(STORE_NAME),
        Serdes.String(),
        SpecificAvroSerde<AvroRecord> // How can we find a Serde that serializes/deserializes an object containing headers and AvroReceord?
    )
)

builder.stream(Consume...).process(ProcessorSupplier { StoreRecordProcessor() }).to(Produce...)

// StoreRecordProcessor example:
private lateinit var store: TimestampedKeyValueStore<String, AvroRecord>

override fun init(context: ProcessorContext<String, AvroRecord>?) {
    super.init(context)

    store = context().getStateStore(stateStoreName)
}

override fun process(record: Record<String, AvroRecord>) {
    store.put("test", record.value()) // How could we store for example the Record-object instead of only it's value?
}

If you are using Avro already, couldn’t you define a new message type, say AvroRecordWithHeaders that has three fields: a byte-array for the wrapped record, a field for the timestamp, and a collection of string/byte-array pairs for the headers. Then you use the newly defined message type as your state store serde. Obviously, you can do exactly the same with Protobuf, JSON or Java serialization…

1 Like

Thanks for your reply :slight_smile: Unfortunately in this specific case creating new avro message type with headers are off the table. We’re taking a look at the other mentioned options tho.