Move data from MongoDB to GreenplumDB (Postgres)

Hello, community.

I had just started diving into Kafka Connect and I decided to learn by doing. My main goal is to move data from MongoDB to GreenplumDB (Postgres) using Kafka connect.
I used the Debezium CDC Connector (with ExtractNewDocumentState SMT) as a source and the JDBC Connector as a sink.

The main problem is that MongoDB collections have a complex structures, like:

{
  "diagnosis": [
    {
      "primary": {
        "code": "A30"
      }
    },
    {
      "comorbidity": {
        "code": "A30"
      }
    }
  ]

If I use Avro or JSON with schema then I get

org.apache.kafka.connect.errors.ConnectException:(STRUCT) type doesn't have a mapping to the SQL database column type.

I was working on this for a long time until @rmoff give me the advice to look for answers here.

This thread helped me a lot: Storing topic data in a single column as a JSON in Postgres.
My Connect configs.

So, there are two ways to solve my problem:

  1. Avro Schema with Record2JsonStringConverter SMT;

    1. Source: Debezium CDC -> ExtractNewDocumentState -> AvroConverter (schema.enable=true);
    2. Sink: AvroConverter (schema.enable=true) -> Record2JsonStringConverter -> JDBC Connector;
  2. Raw JSON

    1. Source: Debezium CDC -> ExtractNewDocumentState -> JsonConverter (schema.enable=false);
    2. ksqlDB processing:
       CREATE STREAM raw_conditions (records VARCHAR) WITH 
      (KAFKA_TOPIC='mongo.conditions', VALUE_FORMAT='KAFKA');  
      
       SET 'auto.offset.reset' = 'earliest';  
      
       CREATE STREAM wrapped_conditions WITH (FORMAT='AVRO') AS 
         SELECT * FROM raw_conditions;
      
    3. Sink: AvroConverter (schema.enable=true) -> Record2JsonStringConverter -> JDBC Connector;

Both of them have some disadvantages:

  1. third-party lib that is not in Confluent Hub and supported only by its developer.
  2. Using one more service (kslDB) and increased disk usage.

So, my question is: are the other simpler and elegant ways to move data from Mongo to Postgres. Thank you.

1 Like

Hi,

I had such use cases multiple times in the past 1-2 years. Based on that experience I made there I currently see the following variants to solve this:

  • Building your own connector (or a “normal” Consumer+ JDBC Connection to postgres) with all the things you need to consider when doing this …

  • Flatten the data out on Mongo side: Create a “staging” model or view (if the mongo connector is able to handle mongo views - never tried) in Mongo which is then used by the source connector and which contains a simpler model.

  • Use the JDBC Sink with Flatten Feature (kafka-connect-jdbc_flatten connector) → I highly advice to avoid this connector. We invested a lot of time for a POC implementation and aborted it with several issues. It seems not to be “production ready” or only usable for some specific use-cases. Also it does not seem to have any active community. Also the documentation is hard to understand and you will end up in debugging through the connector just to understand the configuration.

  • Use the kafka-connect-transform-tojsonstring SMT

As none of the first three variants sounded really “good” to me I created this kafka-connect-transform-tojsonstring SMT to overcome this on a lightweight way.

I also think there is one more disadvantage of the SMT solution you didn’t listen above: The SMT just helps you to get the data into the DB => As single field with a JSON String as value. So you either need some DB-side logic to parse out the information you need or the DB clients that query this field need to do it. At the end it is some additional “moving” part which needs to be tested and maintained (e.g. if schema changes come up)

Recently I stumbled over Added support for arrays of primitives in Postgres dialect by Faqa · Pull Request #805 · confluentinc/kafka-connect-jdbc · GitHub that also might be a variant - but I didn’t look into it or tried it out yet. If this works as described it also might be a solution variant.

In my bookmarks I found Solutions for bi-directional integration between Oracle RDBMS and Apa… from my past internet research about this. Also here all the variants that support not only flat messages are either quite oracle specific and/or feeling somehow not right.

I’m also wondering why this seems to be quite rare issue. I do not find too much about this on the internet but due the frequency I get in touch with this problem I would expect that there should be a bigger need in the community for a official support in the JDBC sink connector

1 Like

@an0r0c thanks for sharing this, and for creating the SMT. I just wanted to add on this point…

…I notice this problem crop up pretty often, but usually without solution. I’m really pleased to see this thread developing here with concrete solutions and directions for people to follow and investigate :slight_smile:

Hi, @an0r0c.
First of all, I want to say thank you for your transformer. It definitely should be a part of a Confluent platform :slight_smile:

Talking about this project, I do it for educational purposes only. Almost all my current work is done with BigQuery and Composer.
In my previous work, I had to analyze the data in Postgres and all data analysts were suffered from irregular data migration from production Postgres and Mongo. As I remember IT department used Pentaho for this. That’s why I decided to make proof-of-concept for my ex-colleagues that Kafka can be one of the fault-tolerant replacements.

You provided a really good summary of all possible solutions. But some of them can’t be used in my case.

  1. Writing of the own connector needs good Java/Scala skills and not all teams have experienced programmers. For example, I use mostly SQL and sometimes Python.
  2. That’s a very interesting idea. I’ll investigate it.
  3. Try to make the flat table from the record is definitely the wrong way for me because of the complexity of the data schema.
    This is one of the structures that I’ve handled to: Encounter - FHIR v4.3.0
  4. Your SMT is the solution. I hope you’ll keep it working because I think it will be needed for a lot of people. And maybe one time it’ll be in Confluent Hub.

I agree with you that some DB-side logic to parse data is needed. But Postgres has a rich set of tools to work with JSON and when you have data in DWH you could use quite flexible SQL for that. Of course, the first level flattening definitely is better but the whole JSON in one column is also good.

1 Like

I found the video to slides from your bookmarks

Triggered by this thread I found the time/motivation to bring the SMT to confluent hub for easier access Record to JSON String Transformation | Confluent Hub

I also wrote about how we ended up with that new SMT (more ore less same what I wrote here on the thread but with more context): Handle nested arrays in Kafka JDBC Sink Connector | BearingPoint Technology Advisory Austria

1 Like

Wow. That’s amazing. Thank you @an0r0c.
I’ll update my project installation process.
I see that you updated documentation in your Github repo. Cool :slight_smile:

Hi. This topic is already solved, thanks to @an0r0c but finally I did myself what I looked for.
I used Christian’s transformer as an example and created yet another one.

It perfectly fits the use case described here: from MongoDB to Postgres, but documents have many fields in first-level land just a few nested fields.

While Cristian’s SMT is good for very complex document hierarchy.