Stream foreign key join with multiple foreign keys

First of all Thank you very much for providing solution for foreign key join.

I’m just wondering is there any easy way to achieve foreign key join with multiple foreign keys.

Employee topic

{
"ID" : 1,
"Name" : "Jay",
"DepartmentIds": [2,3,4]
}

Department topic 

{
"DepartmentId": 2,
"Name": "Computers"
}
{
"DepartmentId": 3,
"Name": "Electronics"
}
{
"DepartmentId": 4,
"Name": "Mechanical"
}

should be transformed into

Joined Output

{
"ID" : 1,
"Name" : "Jay",
"DepartmentIds": [2,3,4]
"Departments": [{
      "Department ID" : 2,
      "Name" : "Electronics"
   },
{
      "Department ID" : 3,
      "Name" : "Computers"
   },
{
      "Department ID" : 4,
      "Name" : "Mechanical"
   }]
}

You would need to apply some workarounds:

  1. split the employee into multiple employees (ie, read the topic as KStream and apply a flatMapValues() or maybe flatMap()
  2. join each employee record to the department
  3. aggregate based on employee ID to re-assemble the record

For (1), it might actually be better to use a stateful transform() and store the list of department IDs per employee. This allows you to react to changes in the list of departments, ie, if the list changes, you can compute removed/added departments by comparing the new list to the old list. (Deletes could still be tricky downstream).

For (2), it depends what semantics you need. You could try a stream-table join, however, I am not sure if it’s the best fit (also with regard to handling of deletes, ie, department removals). A table-table join will still be best: for this case, you could convert the stream into a table via KStream#toTable(); note, that you would need to change the primary key to a combined key of employee ID and department in (1) to ensure that each “sub record” is contained in the table.

2 Likes

Hi @mjsax

Thanks for your reply.

I tried two solutions with News <-> Images relation
A news article can have multiple images within. Author can re-publish the news article with new set of images. (So I should always get the latest images for news article)

Solution 1:
Below solution aggregates all the images across multiple news messages from the topic which is not the right solution for my use case. Is there any easy work around to always get the latest images of news article ?

    KTable<String, NewsImages> newsImages = news.flatMap((newsId,news1) -> {
      List<KeyValue<String,News>> flattenedRecords = new ArrayList<>();
      for (String imageId : news1.getImageIds()) {
        News news9 = new News();
        news9.setId(news1.getId());
        news9.setTitle("Title"+imageId);
        news9.setImageId(imageId);
        flattenedRecords.add(new KeyValue<>(newsId+":"+imageId,news9));
      }
      return flattenedRecords;
    })
      .toTable(Materialized.with(Serdes.String(), newsSerde))
      .join(images, News::getImageId, (news2, image2) -> {
        news2.setImage(image2);
        return news2;
      }, Materialized.<String, News, KeyValueStore<Bytes, byte[]>>
        as("NEWS_WITH_IMAGES")
        .withKeySerde(Serdes.String())
        .withValueSerde(newsSerde))
      .groupBy((key, groupedNews) -> KeyValue.pair(groupedNews.getId(), groupedNews),Grouped.with(Serdes.String(),newsSerde))
      .aggregate(NewsImages::new, (key, news3, aggregator) -> {
        aggregator.setNewsId(news3.getId());
        aggregator.addImage(news3.getImage());
        return aggregator;
      }, (key, value, aggregator) -> aggregator, Materialized.with(Serdes.String(), newsImagesSerde));

Solution 2:
Below is simple and works perfect for my usecase. I’m not sure whether this is the right thing to do ? is it efficient for my use case ?

   KStream<String, News> enrichedImages = news.transformValues(() -> new ValueTransformer<News, News>() {
      private ProcessorContext context;
      @Override
      public void init(ProcessorContext processorContext) {
        this.context = processorContext;
      }
      @Override
      public News transform(News inNews) {
        Set<Image> newsImages = inNews.getImageIds().stream().map(imageId -> {
          ReadOnlyKeyValueStore<String, Image> imageIdImageMap = getTopologyStream()
            .store("IMAGES_TABLE",
              QueryableStoreTypes.<String, Image>keyValueStore());
          return imageIdImageMap.get(imageId);
        }).collect(Collectors.toSet());
        inNews.setImages(newsImages);
        return inNews;
      }
      @Override
      public void close() {
      }
    });

There is nothing wrong with solution 2. If it does the job, you should use it.

1 Like