JDBC Connector - jsonb string field flattening

Hi Confluent community

I have a source database which stores a nested JSON in certain columns, i.e., mutlipe Blockchain transactions in a single database field . The JDBC source connector then “stringifies” these fields. My sink is Apache Druid, which is then unable to flatten the “stringified” fields. My Kafka messages looks like the following, where the “txn” value is stringified:

  "payload": {
    "round": 285889,
    "intra": 0,
    "typeenum": 1,
    "asset": 0,
    "txid": "VTN...05HTTZUUQ==",
    "txn": "{
  \"sig\": \"wI9ILzU...b0QDg==\", \"txn\": {\"fv\": 285885, \"gh\": \"wGHE2Pwdvd7S12BL5FaOP20EGYesN73ktiC1qzkkit8=\", \"lv\": 286885, \"amt\": 799900000, \"fee\": 1500, \"gen\": \"mainnet-v1.0\", \"rcv\": \"g6yKSt...lmohMQdnc=\", \"snd\": \"gPzZI1utLBpSfXpTX6s75AlooclIBSwDTo/5uIoQTVY=\", \"type\": \"pay\"}}",
      "extra": "{}"
  }

The “txn” should eventually look like this:

{
    "round": 285889,
    "intra": 0,
    "typeenum": 1,
    "asset": 0,
    "txid": "VTN...05HTTZUUQ==",
    "txn": {
        "rs": 30576000000,
        "txn": {
            "fv": 62000,
            "gh": "wGHE2P73kt...iC1qzkkit8=",
            "lv": 63000,
            "amt": 100000,
            "fee": 1000,
            "gen": "mainnet-v1.0",
            "rcv": "AszogaMAA...AAA=",
            "snd": "Rvf....w=",
            "note": "...",
            "type": "pay"
        }
    }
}

Is there a way to overcome this, such that the database field can be read as JSON?
If been looking into SMTs and flattening. My JDBC connector currently looks like the following:

{
  "name": "JdbcSourceConnectorConnector_Algorand_Pgsql_Txn_0",
  "config": {
    "name": "JdbcSourceConnectorConnector_Algorand_Pgsql_Txn_0",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "flatten",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": ".",
    "connection.url": "jdbc:postgresql://<HOST>:5432/indexer",
    "connection.user": "postgres",
    "connection.password": "********",
    "table.whitelist": "txn",
    "mode": "bulk",
    "incrementing.column.name": "",
    "validate.non.null": "true",
    "quote.sql.identifiers": "always",
    "topic.prefix": "algorand-pqsql-"
  }
}

Thanks for your help!

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.