Hi,
I’m implementing KafkaStream to use CloudEvents, it formats at some point, but when the message is produced doesn’t get the CloudEvent message format.
public void initKafkaStream() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
PojoCloudEventDataMapper<TicketEvent> ticketEventMapper = PojoCloudEventDataMapper.from(objectMapper, TicketEvent.class);
KStream<String, CloudEvent> rawTicketStream = streamsBuilder.stream(rawTicketEvent, Consumed.with(Serdes.String(), cloudEventSerde));
rawTicketStream
.mapValues(e -> convertToPojo(e, TicketEventMapper))
.filter((k, v) -> v != null)
.groupByKey()
.aggregate(
AggregatedTicketEvent::new,
(key, val, agg) -> doAggregation(agg, val),
Materialized
.<String, AggregatedTicketEvent, KeyValueStore<Bytes, byte[]>>as("aggregatedTicket")
.withValueSerde(aggregatedTicketEventSerde)
.withLoggingDisabled()
)
.mapValues(result -> {
try {
return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withType("ticket_update")
.withSource(sourceTemplate.expand(result.getCurrent().getId()))
.withTime(result.getMeta().getOccurredAt())
.withData(objectMapper.writeValueAsBytes(result))
.withDataContentType("application/json")
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.toStream()
.to(aggregatedTicketEvent, Produced.with(Serdes.String(), cloudEventSerde));
streams = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);
streams.setUncaughtExceptionHandler(ex -> StreamThreadExceptionResponse.REPLACE_THREAD);
streams.start();
}
Based on logs I see that this line:
streams = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);
is using serializer as ByteArraySerializer while we are providing the CloudEventSerializer.
Could someone please guide me here why events are not produced in CloudEvent format?
Thanks in advance