Unwanted escape sequence when rewriting to another topic

Hello everyone,

We are using KSQL 7.4.3 to copy events from a TOPIC_B to a TOPIC_B
The query is defined as follows

insert into TOPIC_B select  * from TOPIC_A emit changes;

The event is as follows in TOPIC_A

{
  "_CLAZZ": "CLASS_INFO",
  "_OPERATION": "UPDATE",
  "_PAYLOAD": {
    "container": {
      "created": 1699422706,
      "extId": "903293",
      "id": "97ezhj3vzc94no9",
      "tagNr": "Test-TAG 123",
      "type": "Typ1",
      "version": 2
    },
    "created": 1699533999 ,
    "id": "97ezhj3vzc94no9ssda34we4345",
    "incomingTaId": "9608kfgdfkgn4950645096ndgngd83h7r57h",
    "loadingPositionsJson": "[{\"position\":1,\"destId\":null,\"numberOfUnits\":12,\"numberOfUnitsOriginal\":12,\"note\":null,\"loadingTimestamp\":\"2023-10-12T15:28:03.537Z\"}]",
    "loadingStartTime": null
  },
  "_TIME": 1699533999
}

This is the result after the copy into TOPIC_B

{
    "_CLAZZ": "CLASS_INFO",
    "_OPERATION": "UPDATE",
    "_PAYLOAD": "{\"container\":{\"created\":1699422706,\"extId\":\"903293\",\"id\":\"97ezhj3vzc94no9\",\"tagNr\":\"Test-TAG 123\",\"type\":\"Typ1\",\"version\":2},\"created\":1699533999,\"id\":\"97ezhj3vzc94no9ssda34we4345\",\"incomingTaId\":\"9608kfgdfkgn4950645096ndgngd83h7r57h\",\"loadingPositionsJson\":\"[{\\\"position\\\":1,\\\"destId\\\":null,\\\"numberOfUnits\\\":12,\\\"numberOfUnitsOriginal\\\":12,\\\"note\\\":null,\\\"loadingTimestamp\\\":\\\"2023-10-12T15:28:03.537Z\\\"}]\",\"loadingStartTime\":null}",
    "_TIME": 1699533999
 }

this means that everything below _PAYLOAD is unintentionally escaped

The stream is defined as follows

 Name                 : STREAM_TEST
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : TOPIC_B (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM_TEST (ROWKEY STRING KEY, _CLAZZ STRING, _OPERATION STRING, _PAYLOAD STRING, _TIME BIGINT) WITH (KAFKA_TOPIC='TOPIC_B', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

Is there a way to prevent the string under _PAYLOAD from being escpaed?

greetings niesel

How did you create the TOPIC_A stream and insert data? The TOPIC_A event that you shared looks like the _PAYLOAD field is a STRUCT but you would hit an incompatible schema error if that were the case so it seems like there is a TO_JSON_STRING call or similar happening at some point.

create stream stream_TOPIC_A
    (ROWKEY varchar KEY, `_CLAZZ` varchar,
    `_OPERATION` varchar,
    `_PAYLOAD` varchar,
    `_TIME` bigint)
with (kafka_topic='TOPIC_A', value_format='JSON');

This topic was automatically closed after 30 days. New replies are no longer allowed.