Hi Folks,
Can helps with the issue i facing now?
Basically i have a topic that constantly getting data from a syslog connector in json Serializer. the data that i received is in below sample format:
{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"doc": "",
"field": "name"
}, {
"type": "string",
"optional": false,
"doc": "Type of message received",
"field": "type"
}, {
"type": "string",
"optional": true,
"doc": "",
"field": "message"
}
],
"optional": false,
"name": "io.confluent.syslog.Message"
},
"payload": {
"name": null,
"type": "RFC3164",
"message": "1,infrastructure,3,\"infra1,infra2,infra3\",dns,dns-base,1,1"
}
}
I would like to extract message attribute from the payload using ksqldb into a delimiter format. I’d tried with below example:
CREATE OR REPLACE STREAM txn_stream WITH(
KAFKA_TOPIC='txn.topic', FORMAT='DELIMITED', PARTITIONS=3, REPLICAS=3, VALUE_DELIMITER=','
)
AS
SELECT
SELECT
SPLIT(EXTRACTJSONFIELD(DATA, '$.payload.message'), ',') [1] col1
, SPLIT(EXTRACTJSONFIELD(DATA, '$.payload.message'), ',') [2] col2
, SPLIT(EXTRACTJSONFIELD(DATA, '$.payload.message'), ',') [3] col3
, SPLIT(EXTRACTJSONFIELD(DATA, '$.payload.message'), ',') [4] col4
....................................
from source_topic;
Unfortunately above methods still interpret comma within the double quotes which is not what my objective. My expected output would be similar as per below:
1,infrastructure,3,"infra1,infra2,infra3",dns,dns-base,1,1
2,infrastructure,5,"infra1,infra2",dns,dns-base,7,2
Can please advice is this achievable via ksqldb alone?
Appreciate any response.
Thanks.
Kenny.