As @gianlucanatali says, the JDBC Sink converter does not support JSON
target type. However, with a bit of stream processing to munge the data you can approximate something like what you describe.
I’m going to use ksqlDB - you can use Kafka Streams also to do this if you’d rather.
First off, declare a stream in ksqlDB over the source JSON topic. By using the KAFKA
format it will not try to apply a schema to it and let’s us access the raw message.
ksql> CREATE STREAM RAW_JSON (MSG VARCHAR)
WITH (KAFKA_TOPIC='TRADES_JSON',
VALUE_FORMAT='KAFKA');
Message
----------------
Stream created
----------------
ksql> SELECT * FROM RAW_JSON EMIT CHANGES;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|MSG |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"SIDE":"SELL","QUANTITY":3665,"SYMBOL":"ZTEST","PRICE":205,"ACCOUNT":"LMN456","USERID":"User_3"} |
|{"SIDE":"SELL","QUANTITY":3162,"SYMBOL":"ZTEST","PRICE":441,"ACCOUNT":"ABC123","USERID":"User_9"} |
When using the JDBC Sink connector you need to supply a schema for the data, which raw JSON alone does not have, so we’re going to use the Schema Registry and Avro. We could use JSON Schema or Protobuf instead.
Here we take all the existing messages and all new ones as they arrive on the source JSON topic and write them as Avro to a new topic:
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> CREATE STREAM WRAPPED_JSON WITH (FORMAT='AVRO') AS
SELECT * FROM RAW_JSON;
Message
--------------------------------------------
Created query with ID CSAS_WRAPPED_JSON_23
--------------------------------------------
The new topic now has the required JSON, but wrapped in a field of its own:
ksql> PRINT WRAPPED_JSON; [34/317]
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2021/03/16 15:34:26.155 Z, key: <null>, value: {"MSG": "{\"SIDE\":\"SELL\",\"QUANTITY\":4935,\"SYMBOL\":\"ZTEST\",\"PRICE\":252,\"ACCOUNT\":\"ABC123\",\"USERID\":\"User_9\"}"}, partition: 0
rowtime: 2021/03/16 15:34:26.533 Z, key: <null>, value: {"MSG": "{\"SIDE\":\"SELL\",\"QUANTITY\":2223,\"SYMBOL\":\"ZJZZT\",\"PRICE\":386,\"ACCOUNT\":\"XYZ789\",\"USERID\":\"User_1\"}"}, partition: 0
Now we create the JDBC sink connector configuration to push the topic to the target DB. I’m doing this from ksqlDB but it’s just Kafka Connect - you can use the native REST API if you’d rather.
CREATE SINK CONNECTOR SINK_POSTGRES WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'WRAPPED_JSON',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true'
);
When we head to Postgres we have the new table:
postgres=# \d
List of relations
Schema | Name | Type | Owner
--------+--------------+-------+----------
public | WRAPPED_JSON | table | postgres
(1 row)
postgres=# \d "WRAPPED_JSON"
Table "public.WRAPPED_JSON"
Column | Type | Collation | Nullable | Default
--------+------+-----------+----------+---------
MSG | text | | |
and the JSON data from the original source topic as a single column in the table:
postgres=# SELECT "MSG" FROM "WRAPPED_JSON" LIMIT 5;
MSG
--------------------------------------------------------------------------------------------------
{"SIDE":"SELL","QUANTITY":4581,"SYMBOL":"ZVV","PRICE":109,"ACCOUNT":"XYZ789","USERID":"User_5"}
{"SIDE":"BUY","QUANTITY":2367,"SYMBOL":"ZVV","PRICE":35,"ACCOUNT":"XYZ789","USERID":"User_8"}
{"SIDE":"SELL","QUANTITY":666,"SYMBOL":"ZJZZT","PRICE":302,"ACCOUNT":"XYZ789","USERID":"User_3"}
{"SIDE":"BUY","QUANTITY":4630,"SYMBOL":"ZWZZT","PRICE":35,"ACCOUNT":"ABC123","USERID":"User_8"}
{"SIDE":"BUY","QUANTITY":4921,"SYMBOL":"ZJZZT","PRICE":913,"ACCOUNT":"LMN456","USERID":"User_9"}
(5 rows)
postgres= SELECT "MSG"::json->'SIDE' AS SIDE,
"MSG"::json->'QUANTITY' AS QUANTITY
FROM "WRAPPED_JSON"
LIMIT 5;
side | quantity
--------+----------
"SELL" | 4581
"BUY" | 2367
"SELL" | 666
"BUY" | 4630
"BUY" | 4921
(5 rows)
With 'auto.create' = 'true'
the connector creates the table using the data mapping that @gianlucanatali referred to (so in the case of Postgres STRING
→ TEXT
). However, if you pre-create the target table in Postgres you can set it as a JSON
(or JSONB
) field
postgres=# CREATE TABLE JSON_EXAMPLE ("MSG" JSONB);
CREATE TABLE
and the connector will happily insert rows to it:
postgres=# \d json_example
Table "public.json_example"
Column | Type | Collation | Nullable | Default
--------+-------+-----------+----------+---------
MSG | jsonb | | |
postgres=# select * from json_example limit 5 ;
MSG
-------------------------------------------------------------------------------------------------------------
{"SIDE": "SELL", "PRICE": 109, "SYMBOL": "ZVV", "USERID": "User_5", "ACCOUNT": "XYZ789", "QUANTITY": 4581}
{"SIDE": "BUY", "PRICE": 35, "SYMBOL": "ZVV", "USERID": "User_8", "ACCOUNT": "XYZ789", "QUANTITY": 2367}
{"SIDE": "SELL", "PRICE": 302, "SYMBOL": "ZJZZT", "USERID": "User_3", "ACCOUNT": "XYZ789", "QUANTITY": 666}
{"SIDE": "BUY", "PRICE": 35, "SYMBOL": "ZWZZT", "USERID": "User_8", "ACCOUNT": "ABC123", "QUANTITY": 4630}
{"SIDE": "BUY", "PRICE": 913, "SYMBOL": "ZJZZT", "USERID": "User_9", "ACCOUNT": "LMN456", "QUANTITY": 4921}
(5 rows)