Extract escaped JSON stored as string that also has a special character

Background

  • Using Confluent Cloud
  • Setup a basic ksqlDB cluster on CC
  • Setup a Mongodb Atlas Source Connector on CC

The change event from Mongodb has a structure like this:

{
  "_id": {
    "_data": "826216BAFB0000000B2B022C0100296E5A1004742AD217CDB447F2A220AF47CF334D8A46645F69640064620DCC19303BFC8BD20F9FB80004"
  },

  "documentKey": {
    "_id": "{\"$oid\": \"620dcc19303bfc8bd20f9fb8\"}"
  },
  "ns": {
    "coll": "myCollection",
    "db": "myDatabase"
  },
  "operationType": "update",
  "updateDescription": {
    "updatedFields": {
      "description": "new description"
    }
  }
}

Created a basic stream in ksqlDB (just focusing on extracting the object ID):

 CREATE STREAM my_stream (documentKey VARCHAR) 
 WITH (KAFKA_TOPIC='myDatabase.myCollection, VALUE_FORMAT='JSON');

Then attempting to use EXTRACTJSONFIELD to get the object ID:

CREATE STREAM extract_id_stream WITH (VALUE_FORMAT='AVRO') AS 
 SELECT EXTRACTJSONFIELD(documentKey,'[\"$oid"]') AS objectId
         FROM my_stream;

This returns NULL for objectId.

I’ve tried different variations as well

  • flatten SMT which produces documentKey._id then attempt to extract $oid
  • Defining a stream that uses STRUCT and referencing $oid using backticks

Hopefully this is enough information to describe the problem.

I found a workaround - specify SimplifiedJson in the Mongodb Source Connector, changes the structure of the object id.

This topic was automatically closed after 30 days. New replies are no longer allowed.