Hi @mjsax
Thanks for your reply.
I tried two solutions with News <-> Images relation
A news article can have multiple images within. Author can re-publish the news article with new set of images. (So I should always get the latest images for news article)
Solution 1:
Below solution aggregates all the images across multiple news messages from the topic which is not the right solution for my use case. Is there any easy work around to always get the latest images of news article ?
KTable<String, NewsImages> newsImages = news.flatMap((newsId,news1) -> {
List<KeyValue<String,News>> flattenedRecords = new ArrayList<>();
for (String imageId : news1.getImageIds()) {
News news9 = new News();
news9.setId(news1.getId());
news9.setTitle("Title"+imageId);
news9.setImageId(imageId);
flattenedRecords.add(new KeyValue<>(newsId+":"+imageId,news9));
}
return flattenedRecords;
})
.toTable(Materialized.with(Serdes.String(), newsSerde))
.join(images, News::getImageId, (news2, image2) -> {
news2.setImage(image2);
return news2;
}, Materialized.<String, News, KeyValueStore<Bytes, byte[]>>
as("NEWS_WITH_IMAGES")
.withKeySerde(Serdes.String())
.withValueSerde(newsSerde))
.groupBy((key, groupedNews) -> KeyValue.pair(groupedNews.getId(), groupedNews),Grouped.with(Serdes.String(),newsSerde))
.aggregate(NewsImages::new, (key, news3, aggregator) -> {
aggregator.setNewsId(news3.getId());
aggregator.addImage(news3.getImage());
return aggregator;
}, (key, value, aggregator) -> aggregator, Materialized.with(Serdes.String(), newsImagesSerde));
Solution 2:
Below is simple and works perfect for my usecase. I’m not sure whether this is the right thing to do ? is it efficient for my use case ?
KStream<String, News> enrichedImages = news.transformValues(() -> new ValueTransformer<News, News>() {
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public News transform(News inNews) {
Set<Image> newsImages = inNews.getImageIds().stream().map(imageId -> {
ReadOnlyKeyValueStore<String, Image> imageIdImageMap = getTopologyStream()
.store("IMAGES_TABLE",
QueryableStoreTypes.<String, Image>keyValueStore());
return imageIdImageMap.get(imageId);
}).collect(Collectors.toSet());
inNews.setImages(newsImages);
return inNews;
}
@Override
public void close() {
}
});