I had just started diving into Kafka Connect and I decided to learn by doing. My main goal is to move data from MongoDB to GreenplumDB (Postgres) using Kafka connect.
I used the Debezium CDC Connector (with ExtractNewDocumentState SMT) as a source and the JDBC Connector as a sink.
The main problem is that MongoDB collections have a complex structures, like:
CREATE STREAM raw_conditions (records VARCHAR) WITH
(KAFKA_TOPIC='mongo.conditions', VALUE_FORMAT='KAFKA');
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM wrapped_conditions WITH (FORMAT='AVRO') AS
SELECT * FROM raw_conditions;
I had such use cases multiple times in the past 1-2 years. Based on that experience I made there I currently see the following variants to solve this:
Building your own connector (or a “normal” Consumer+ JDBC Connection to postgres) with all the things you need to consider when doing this …
Flatten the data out on Mongo side: Create a “staging” model or view (if the mongo connector is able to handle mongo views - never tried) in Mongo which is then used by the source connector and which contains a simpler model.
Use the JDBC Sink with Flatten Feature (kafka-connect-jdbc_flatten connector) → I highly advice to avoid this connector. We invested a lot of time for a POC implementation and aborted it with several issues. It seems not to be “production ready” or only usable for some specific use-cases. Also it does not seem to have any active community. Also the documentation is hard to understand and you will end up in debugging through the connector just to understand the configuration.
As none of the first three variants sounded really “good” to me I created this kafka-connect-transform-tojsonstring SMT to overcome this on a lightweight way.
I also think there is one more disadvantage of the SMT solution you didn’t listen above: The SMT just helps you to get the data into the DB => As single field with a JSON String as value. So you either need some DB-side logic to parse out the information you need or the DB clients that query this field need to do it. At the end it is some additional “moving” part which needs to be tested and maintained (e.g. if schema changes come up)
I’m also wondering why this seems to be quite rare issue. I do not find too much about this on the internet but due the frequency I get in touch with this problem I would expect that there should be a bigger need in the community for a official support in the JDBC sink connector
@an0r0c thanks for sharing this, and for creating the SMT. I just wanted to add on this point…
…I notice this problem crop up pretty often, but usually without solution. I’m really pleased to see this thread developing here with concrete solutions and directions for people to follow and investigate
Hi, @an0r0c.
First of all, I want to say thank you for your transformer. It definitely should be a part of a Confluent platform
Talking about this project, I do it for educational purposes only. Almost all my current work is done with BigQuery and Composer.
In my previous work, I had to analyze the data in Postgres and all data analysts were suffered from irregular data migration from production Postgres and Mongo. As I remember IT department used Pentaho for this. That’s why I decided to make proof-of-concept for my ex-colleagues that Kafka can be one of the fault-tolerant replacements.
You provided a really good summary of all possible solutions. But some of them can’t be used in my case.
Writing of the own connector needs good Java/Scala skills and not all teams have experienced programmers. For example, I use mostly SQL and sometimes Python.
That’s a very interesting idea. I’ll investigate it.
Try to make the flat table from the record is definitely the wrong way for me because of the complexity of the data schema.
This is one of the structures that I’ve handled to: Encounter - FHIR v4.3.0
Your SMT is the solution. I hope you’ll keep it working because I think it will be needed for a lot of people. And maybe one time it’ll be in Confluent Hub.
I agree with you that some DB-side logic to parse data is needed. But Postgres has a rich set of tools to work with JSON and when you have data in DWH you could use quite flexible SQL for that. Of course, the first level flattening definitely is better but the whole JSON in one column is also good.
Wow. That’s amazing. Thank you @an0r0c.
I’ll update my project installation process.
I see that you updated documentation in your Github repo. Cool
Hi. This topic is already solved, thanks to @an0r0c but finally I did myself what I looked for.
I used Christian’s transformer as an example and created yet another one.
It perfectly fits the use case described here: from MongoDB to Postgres, but documents have many fields in first-level land just a few nested fields.
While Cristian’s SMT is good for very complex document hierarchy.