I’m reading bin-logs data from MySQL using debezium and sink payload data to MongoDB using Kafka-MongoDB sink connector. Following the example of Kafka’s message.
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "Name" }, { "type": "string", "optional": true, "field": "City" }, { "type": "string", "optional": true, "field": "Email" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "Joindate" }, { "type": "string", "optional": true, "field": "__table" }, { "type": "string", "optional": true, "field": "__deleted" } ], "optional": false, "name": "Docker_Mysql.kafka_TEST.Testing.Value" }, "payload": { "id": 4, "Name": "Sasha", "City": "Gresham", "Email": "arcu.Vestibulum.ante@elitpharetra.org", "Joindate": 1495964559000, "__table": "Testing", "__deleted": "false" } }
{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "Name" }, { "type": "string", "optional": true, "field": "City" }, { "type": "string", "optional": true, "field": "Email" }, { "type": "int64", "optional": true, "name": "io.debezium.time.Timestamp", "version": 1, "field": "Joindate" }, { "type": "string", "optional": true, "field": "__table" }, { "type": "string", "optional": true, "field": "__deleted" } ], "optional": false, "name": "Docker_Mysql.kafka_TEST.Testing.Value" }, "payload": { "id": 11, "Name": "Test", "City": "Test", "Email": "Test@test.com", "Joindate": null, "__table": "Testing", "__deleted": "false" } }
So the Mongo DB document is
{
"_id": 4,
"Name": "Sasha",
"City": "Gresham",
"Email": "arcu.Vestibulum.ante@elitpharetra.org",
"Joindate": "1495964559000",
"__table": "Testing",
"__deleted": "false"
},
{
"_id": 11,
"Name": "Test",
"City": "Test",
"Email": "Test@test.com",
"Joindate": null,
"__table": "Testing",
"__deleted": "false"
}
but if any key has a null value then remove that key value from the document
so a document with _id=11 should be like this to remove “Joindate” key value from mongo document.
{
“_id”: 11,
“Name”: “Test”,
“City”: “Test”,
“Email”: “Test@test.com”,
“__table”: “Testing”,
“__deleted”: “false”
}
So how can I dynamically configure the connector to remove null from Mongo document?
Code from Mongo Shell
db.Testing.update(
{},
[{ $replaceWith: {
$arrayToObject: {
$filter: {
input: { $objectToArray: "$$ROOT" },
as: "item",
cond: { $ne: ["$$item.v", null] }
}
}
}}],
{ multi: true }
)