KafkaStream producer isn't compatible with CloudEvents

Hi,

I’m implementing KafkaStream to use CloudEvents, it formats at some point, but when the message is produced doesn’t get the CloudEvent message format.

public void initKafkaStream() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        PojoCloudEventDataMapper<TicketEvent> ticketEventMapper = PojoCloudEventDataMapper.from(objectMapper, TicketEvent.class);
        KStream<String, CloudEvent> rawTicketStream = streamsBuilder.stream(rawTicketEvent, Consumed.with(Serdes.String(), cloudEventSerde));

        rawTicketStream
                .mapValues(e -> convertToPojo(e, TicketEventMapper))
                .filter((k, v) -> v != null)
                .groupByKey()
                .aggregate(
                        AggregatedTicketEvent::new,
                        (key, val, agg) -> doAggregation(agg, val),
                        Materialized
                                .<String, AggregatedTicketEvent, KeyValueStore<Bytes, byte[]>>as("aggregatedTicket")
                                .withValueSerde(aggregatedTicketEventSerde)
                                .withLoggingDisabled()
                )
                .mapValues(result -> {
                    try {
                        return CloudEventBuilder.v1()
                                .withId(UUID.randomUUID().toString())
                                .withType("ticket_update")
                                .withSource(sourceTemplate.expand(result.getCurrent().getId()))
                                .withTime(result.getMeta().getOccurredAt())
                                .withData(objectMapper.writeValueAsBytes(result))
                                .withDataContentType("application/json")
                                .build();
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                })
                .toStream()
                .to(aggregatedTicketEvent, Produced.with(Serdes.String(), cloudEventSerde));

        streams = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);
        streams.setUncaughtExceptionHandler(ex -> StreamThreadExceptionResponse.REPLACE_THREAD);

        streams.start();
    }

Based on logs I see that this line:

streams = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);

is using serializer as ByteArraySerializer while we are providing the CloudEventSerializer.

Could someone please guide me here why events are not produced in CloudEvent format?

Thanks in advance

Hi guys,
I just found the solution regarding the issue and posted the answer in SO: java - Apache Kafka - Implementing a KTable and producing event using CloudEvent - Stack Overflow

Best,