Hi all
We’re currently in the process of applying kafka headers for traceability in our projects. However, we’re currently struggling with keeping headers while storing avro records in state stores. Would anyone have some pointers on how we could achieve this? Any input at this point is highly appreciated.
Rough draft of the application in mind:
// Topology example (pseudocode):
val builder = StreamsBuilder()
builder.addStateStore(
Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore(STORE_NAME),
Serdes.String(),
SpecificAvroSerde<AvroRecord> // How can we find a Serde that serializes/deserializes an object containing headers and AvroReceord?
)
)
builder.stream(Consume...).process(ProcessorSupplier { StoreRecordProcessor() }).to(Produce...)
// StoreRecordProcessor example:
private lateinit var store: TimestampedKeyValueStore<String, AvroRecord>
override fun init(context: ProcessorContext<String, AvroRecord>?) {
super.init(context)
store = context().getStateStore(stateStoreName)
}
override fun process(record: Record<String, AvroRecord>) {
store.put("test", record.value()) // How could we store for example the Record-object instead of only it's value?
}