Is KTable-KTable joins good for CDC

Hi,

I’m trying to create a CDC pipeline using kafka connect + kafka.
The objective is to sync any changes from postgres to elasticsearch. For streaming the change logs I’m using debezium’s plugin and for sinking it to ElasticSearch I’m using elasticsearch sink connector.

But I want to merge one complete doc, as individual indexes for each table doesn’t make much sense. For this purpose I’m using kafka streams. Actually doing a KTable-KTable join.

But there are 9 tables in total and for each foreign key join of KTables changelog topic is getting created. And that worries me, if I were to do join of all these 9 tables (now topics).

  • Will it scale, how efficient will this be?

  • Will it consume a lot of size?

  • Or will the data in these intermediary kafka topics (changelog or repartition) be stored temporarily and after a certain amount of time will be purged?

Is there any better way to handle this situation? 'cause I want to get a complete doc out of all the tables and sink them in ES.

Thanks :slight_smile:

This would not be the approach I would use for CDC from an RDBMS into a document search (Elastic). I would use a single aggregate approach. This will require the entire document is stored in a change log topic (for that aggregate), but wouldn’t require every table in the RDBMS to be a KTable.

  • KStream in changes from each source

  • map to a common aggregate format that can properly understand “partial” documents.

  • merge those partial documents into a common topic keyed by the parent “document_id”

  • aggregate those deltas into existed aggregate

  • emit that aggregate to a topic that is consumed by elastic sink

Now there are implementation details not listed above, because I don’t know the data well enough.

  • How to get to parent document_id from any existing CDC table, you may need earlier state stores to accomplish this, but try to avoid synthetic ids as you will then need to manage/maintain those in additional to the existing state you have.

  • If you have a lot of nested elements, you may not put it into the final aggregate, and you may need to have earlier ones in the pipeline

  • Idempotent is important with this, don’t want to add a child twice because the message was processed twice.

Other things that could make things a little easier

  • elastic has the concept of merge as well, if you can leverage this could avoid storing the full document/aggregate in kafka streams — I have tried to use this in the past, but the project went another direction prior to seeing if it could be leveraged

  • if the aggregate needs to be purged, consider a manual tomb-stoning process; instead of windowing. E.G. have a process that examines the aggregate and, for any > 1 year old, sends in a tombstone message to allow it to be purged.

  • backup the aggregate’s change-log topic to a bucket (e.g. s3)

  • build in a restore process to read from that bucket and rebuild aggregates.

Hi @nbuesing ,

Thank you for explaining in such details.
I also had a doubt that there will be some better approach than the one I mentioned.
Guess that coming here was the right approach.

Thanks :slightly_smiling_face:

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.