Storing topic data in a single column as a JSON in Postgres

Dear Team,

My requirement is to store the Kafka Topic value (which is in AVRO format) in a Postgres Table in a
single column as JSON.
I tried using Sink connector but it converts each Topic value to different column in a table. But My requirement is to store them in a single column.
For example: Below Topic value should be stored in a single column in a Table.

{
“Field1” : “value1”,
“Field2” : “value2”,
“Field3” : “value3”,
“Field4” : “value4”,
}
  • Kindly suggest me a way to achieve this. Let me know if my problem statement is not clear.
1 Like

Hi @vmanivasagan , welcome to the forum. Looks like the answer is: not possible at the moment with that connector.
You can find some details in this SO thread. Here the updated documentation for the JDBC Sink, you can see in the mapping table of types
https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html#auto-creation-and-auto-evolution

1 Like

@gianlucanatali can we achieve this using custom connectors? I’m yet to read the custom connector concepts.

As @gianlucanatali says, the JDBC Sink converter does not support JSON target type. However, with a bit of stream processing to munge the data you can approximate something like what you describe.

I’m going to use ksqlDB - you can use Kafka Streams also to do this if you’d rather.

First off, declare a stream in ksqlDB over the source JSON topic. By using the KAFKA format it will not try to apply a schema to it and let’s us access the raw message.

ksql> CREATE STREAM RAW_JSON (MSG VARCHAR) 
        WITH (KAFKA_TOPIC='TRADES_JSON', 
        VALUE_FORMAT='KAFKA');

 Message
----------------
 Stream created
----------------
ksql> SELECT * FROM RAW_JSON EMIT CHANGES;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|MSG                                                                                                                                                                                       |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"SIDE":"SELL","QUANTITY":3665,"SYMBOL":"ZTEST","PRICE":205,"ACCOUNT":"LMN456","USERID":"User_3"}                                                                                         |
|{"SIDE":"SELL","QUANTITY":3162,"SYMBOL":"ZTEST","PRICE":441,"ACCOUNT":"ABC123","USERID":"User_9"}                                                                                         |

When using the JDBC Sink connector you need to supply a schema for the data, which raw JSON alone does not have, so we’re going to use the Schema Registry and Avro. We could use JSON Schema or Protobuf instead.

Here we take all the existing messages and all new ones as they arrive on the source JSON topic and write them as Avro to a new topic:

ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

ksql> CREATE STREAM WRAPPED_JSON WITH (FORMAT='AVRO') AS 
         SELECT * FROM RAW_JSON;

 Message
--------------------------------------------
 Created query with ID CSAS_WRAPPED_JSON_23
--------------------------------------------

The new topic now has the required JSON, but wrapped in a field of its own:

ksql> PRINT WRAPPED_JSON;                                                                                                                                                           [34/317]
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2021/03/16 15:34:26.155 Z, key: <null>, value: {"MSG": "{\"SIDE\":\"SELL\",\"QUANTITY\":4935,\"SYMBOL\":\"ZTEST\",\"PRICE\":252,\"ACCOUNT\":\"ABC123\",\"USERID\":\"User_9\"}"}, partition: 0
rowtime: 2021/03/16 15:34:26.533 Z, key: <null>, value: {"MSG": "{\"SIDE\":\"SELL\",\"QUANTITY\":2223,\"SYMBOL\":\"ZJZZT\",\"PRICE\":386,\"ACCOUNT\":\"XYZ789\",\"USERID\":\"User_1\"}"}, partition: 0

Now we create the JDBC sink connector configuration to push the topic to the target DB. I’m doing this from ksqlDB but it’s just Kafka Connect - you can use the native REST API if you’d rather.

CREATE SINK CONNECTOR SINK_POSTGRES WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'WRAPPED_JSON',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true'
  );

When we head to Postgres we have the new table:

postgres=# \d
            List of relations
 Schema |     Name     | Type  |  Owner
--------+--------------+-------+----------
 public | WRAPPED_JSON | table | postgres
(1 row)

postgres=# \d "WRAPPED_JSON"
          Table "public.WRAPPED_JSON"
 Column | Type | Collation | Nullable | Default
--------+------+-----------+----------+---------
 MSG    | text |           |          |

and the JSON data from the original source topic as a single column in the table:

postgres=# SELECT "MSG" FROM "WRAPPED_JSON" LIMIT 5;
                                               MSG
--------------------------------------------------------------------------------------------------
 {"SIDE":"SELL","QUANTITY":4581,"SYMBOL":"ZVV","PRICE":109,"ACCOUNT":"XYZ789","USERID":"User_5"}
 {"SIDE":"BUY","QUANTITY":2367,"SYMBOL":"ZVV","PRICE":35,"ACCOUNT":"XYZ789","USERID":"User_8"}
 {"SIDE":"SELL","QUANTITY":666,"SYMBOL":"ZJZZT","PRICE":302,"ACCOUNT":"XYZ789","USERID":"User_3"}
 {"SIDE":"BUY","QUANTITY":4630,"SYMBOL":"ZWZZT","PRICE":35,"ACCOUNT":"ABC123","USERID":"User_8"}
 {"SIDE":"BUY","QUANTITY":4921,"SYMBOL":"ZJZZT","PRICE":913,"ACCOUNT":"LMN456","USERID":"User_9"}
(5 rows)
postgres=  SELECT "MSG"::json->'SIDE' AS SIDE, 
                  "MSG"::json->'QUANTITY' AS QUANTITY 
             FROM "WRAPPED_JSON" 
            LIMIT 5;
  side  | quantity
--------+----------
 "SELL" | 4581
 "BUY"  | 2367
 "SELL" | 666
 "BUY"  | 4630
 "BUY"  | 4921
(5 rows)

With 'auto.create' = 'true' the connector creates the table using the data mapping that @gianlucanatali referred to (so in the case of Postgres STRINGTEXT). However, if you pre-create the target table in Postgres you can set it as a JSON (or JSONB) field

postgres=# CREATE TABLE JSON_EXAMPLE ("MSG" JSONB);
CREATE TABLE

and the connector will happily insert rows to it:

postgres=# \d json_example
           Table "public.json_example"
 Column | Type  | Collation | Nullable | Default
--------+-------+-----------+----------+---------
 MSG    | jsonb |           |          |

postgres=# select * from json_example limit 5 ;
                                                     MSG
-------------------------------------------------------------------------------------------------------------
 {"SIDE": "SELL", "PRICE": 109, "SYMBOL": "ZVV", "USERID": "User_5", "ACCOUNT": "XYZ789", "QUANTITY": 4581}
 {"SIDE": "BUY", "PRICE": 35, "SYMBOL": "ZVV", "USERID": "User_8", "ACCOUNT": "XYZ789", "QUANTITY": 2367}
 {"SIDE": "SELL", "PRICE": 302, "SYMBOL": "ZJZZT", "USERID": "User_3", "ACCOUNT": "XYZ789", "QUANTITY": 666}
 {"SIDE": "BUY", "PRICE": 35, "SYMBOL": "ZWZZT", "USERID": "User_8", "ACCOUNT": "ABC123", "QUANTITY": 4630}
 {"SIDE": "BUY", "PRICE": 913, "SYMBOL": "ZJZZT", "USERID": "User_9", "ACCOUNT": "LMN456", "QUANTITY": 4921}
(5 rows)
2 Likes

@vmanivasagan, you can also use this really simple but great SMT for this: GitHub - an0r0c/kafka-connect-transform-tojsonstring: transform-to-json-string is a Single Message Transformation (SMT) for Apache Kafka® Connect to convert a given Connect Record to a single JSON String. It's an UNOFFICIAL community project.

In this case kSQL or Streams application is not needed.

You will only need to add something like this in your JDBC Sink Connector configuration:

...
"transforms": "ValueToJson,",
"transforms.ValueToJson.type": "com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter$Value",
"transforms.ValueToJson.json.string.field.name" : "JSON_PAYLOAD",
...
3 Likes

Ooooh nice, I’ll have to try that one out @whatsupbros!

1 Like

Thanks! @whatsupbros achieved solution for my scenario through custom SMT as per your suggestion. My sincere thanks to @rmoff for your quick response.

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.