Background
- Using Confluent Cloud
- Setup a basic ksqlDB cluster on CC
- Setup a Mongodb Atlas Source Connector on CC
The change event from Mongodb has a structure like this:
{
"_id": {
"_data": "826216BAFB0000000B2B022C0100296E5A1004742AD217CDB447F2A220AF47CF334D8A46645F69640064620DCC19303BFC8BD20F9FB80004"
},
"documentKey": {
"_id": "{\"$oid\": \"620dcc19303bfc8bd20f9fb8\"}"
},
"ns": {
"coll": "myCollection",
"db": "myDatabase"
},
"operationType": "update",
"updateDescription": {
"updatedFields": {
"description": "new description"
}
}
}
Created a basic stream in ksqlDB (just focusing on extracting the object ID):
CREATE STREAM my_stream (documentKey VARCHAR)
WITH (KAFKA_TOPIC='myDatabase.myCollection, VALUE_FORMAT='JSON');
Then attempting to use EXTRACTJSONFIELD to get the object ID:
CREATE STREAM extract_id_stream WITH (VALUE_FORMAT='AVRO') AS
SELECT EXTRACTJSONFIELD(documentKey,'[\"$oid"]') AS objectId
FROM my_stream;
This returns NULL for objectId.
I’ve tried different variations as well
- flatten SMT which produces documentKey._id then attempt to extract $oid
- Defining a stream that uses STRUCT and referencing $oid using backticks
Hopefully this is enough information to describe the problem.