Import topic to store with simple processing

Hi,
As part of an application that Im developing (Java, SpringBoot, SpringKafka, Spring Reactive) I need to import a couple of kafka-topics, join the data and expose it in a streaming fashion using WS. The topics contains the whole set of data at all times so the application doesnt need to keep any state as I see it - if the app goes down the data will be re-imported in total into the application.
The data is in protobuf-format so It needs some parsing to be used in the application

Someone explained to me that this would be doable using Kafka Streams. I have experience of Kafka, but no experience of Kafka Streams

I first set out to solve the simplest case - importing one topic, and being able to query the data using a REST-interface.

Ive been scanning the Internet for examples but most of them are more advanced than the my simple case.
Ive setup some kafka-configuration that seem to work and I have created a topic-importer-bean like this:

@Component
public class TopicProcessor {
    
        @Autowired
        void buildPipeline(StreamsBuilder streamsBuilder) {

            KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");

            Materialized<Object, Object, KeyValueStore<Bytes, byte[]>> mat2 =
                    Materialized.as(storeSupplier);

            streamsBuilder
                    .stream("my-topic", Consumed.with(Serdes.String(), Serdes.ByteArray()))
                    .mapValues(l -> parseProtobuf(l))
                    .filter((k, v) -> Objects.nonNull(v))
                    .map(((key, value) -> KeyValue.pair((Object) value.getId().getValue(), (Object) value)))
                    .toTable(mat2);

        }
}

However, the import fails with :

Not authorized to access topics: [user-KSTREAM-TOTABLE-0000000004-repartition]

  1. How do I solve this? Can I avoid it some how as I dont need the backing into topic?
  2. As you can tell I cast the key and value to Object - this is because I cant it to work otherwise? How can store the <key,value> as <String, MyObject>?

Hope you dont think this a to simple question. Ive tried to solve it on my own but I simply cant get it to work.

Any input is welcome
Best Regards

@jiinxx Your usage of the map function prior to calling toTable, marks the stream for data re-partitioning, and that’s why Kafka Streams is attempting to work with the underlying ‘repartition’ topic. See the streams reference docs here: KStream (kafka 3.1.1 API). It seems that your application is not authorized to create/read/write to that topic, so you may need to verify the ACLs assigned to your application. Here is some documentation on ACLs for streams apps: Kafka Streams Security | Confluent Documentation

For protobuf and data types, I cannot see the return type of the parseProtobuf function so I cannot see the type of the value objects after that invocation. Maybe this documentation on working with Serdes in Kafka Streams can help (There is a protobuf section): Kafka Streams Data Types and Serialization | Confluent Documentation

Hope this helps some

3 Likes