Mongo sink remove null values fields from Document

I’m reading bin-logs data from MySQL using debezium and sink payload data to MongoDB using Kafka-MongoDB sink connector. Following the example of Kafka’s message.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "Name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "City"
      },
      {
        "type": "string",
        "optional": true,
        "field": "Email"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "Joindate"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__table"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "Docker_Mysql.kafka_TEST.Testing.Value"
  },
  "payload": {
    "id": 4,
    "Name": "Sasha",
    "City": "Gresham",
    "Email": "arcu.Vestibulum.ante@elitpharetra.org",
    "Joindate": 1495964559000,
    "__table": "Testing",
    "__deleted": "false"
  }
}
{
      "schema": {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "Name"
          },
          {
            "type": "string",
            "optional": true,
            "field": "City"
          },
          {
            "type": "string",
            "optional": true,
            "field": "Email"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "io.debezium.time.Timestamp",
            "version": 1,
            "field": "Joindate"
          },
          {
            "type": "string",
            "optional": true,
            "field": "__table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "__deleted"
          }
        ],
        "optional": false,
        "name": "Docker_Mysql.kafka_TEST.Testing.Value"
      },
      "payload": {
        "id": 11,
        "Name": "Test",
        "City": "Test",
        "Email": "Test@test.com",
        "Joindate": null,
        "__table": "Testing",
        "__deleted": "false"
      }
    }

So the Mongo DB document is

{
  "_id": 4,
  "Name": "Sasha",
  "City": "Gresham",
  "Email": "arcu.Vestibulum.ante@elitpharetra.org",
  "Joindate": "1495964559000",
  "__table": "Testing",
  "__deleted": "false"
},
{
   "_id": 11,
   "Name": "Test",
   "City": "Test",
   "Email": "Test@test.com",
  "Joindate": null,
  "__table": "Testing",
   "__deleted": "false"
 }

but if any key has a null value then remove that key value from the document
so a document with _id=11 should be like this to remove “Joindate” key value from mongo document.

{
“_id”: 11,
“Name”: “Test”,
“City”: “Test”,
“Email”: “Test@test.com”,
“__table”: “Testing”,
“__deleted”: “false”
}

So how can I dynamically configure the connector to remove null from Mongo document?

Code from Mongo Shell

db.Testing.update(
  {},
  [{ $replaceWith: {
    $arrayToObject: {
      $filter: {
        input: { $objectToArray: "$$ROOT" },
        as: "item",
        cond: { $ne: ["$$item.v", null] }
      }
    }
  }}],
  { multi: true }
)

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.