Hi,
As part of an application that Im developing (Java, SpringBoot, SpringKafka, Spring Reactive) I need to import a couple of kafka-topics, join the data and expose it in a streaming fashion using WS. The topics contains the whole set of data at all times so the application doesnt need to keep any state as I see it - if the app goes down the data will be re-imported in total into the application.
The data is in protobuf-format so It needs some parsing to be used in the application
Someone explained to me that this would be doable using Kafka Streams. I have experience of Kafka, but no experience of Kafka Streams
I first set out to solve the simplest case - importing one topic, and being able to query the data using a REST-interface.
Ive been scanning the Internet for examples but most of them are more advanced than the my simple case.
Ive setup some kafka-configuration that seem to work and I have created a topic-importer-bean like this:
@Component
public class TopicProcessor {
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("mystore");
Materialized<Object, Object, KeyValueStore<Bytes, byte[]>> mat2 =
Materialized.as(storeSupplier);
streamsBuilder
.stream("my-topic", Consumed.with(Serdes.String(), Serdes.ByteArray()))
.mapValues(l -> parseProtobuf(l))
.filter((k, v) -> Objects.nonNull(v))
.map(((key, value) -> KeyValue.pair((Object) value.getId().getValue(), (Object) value)))
.toTable(mat2);
}
}
However, the import fails with :
Not authorized to access topics: [user-KSTREAM-TOTABLE-0000000004-repartition]
- How do I solve this? Can I avoid it some how as I dont need the backing into topic?
- As you can tell I cast the key and value to Object - this is because I cant it to work otherwise? How can store the <key,value> as <String, MyObject>?
Hope you dont think this a to simple question. Ive tried to solve it on my own but I simply cant get it to work.
Any input is welcome
Best Regards