JDBC Source Connector with JSON payload having a field with array of records

Hi All,

We are trying to use Kafka Source Connector which read from PostgreSQL Database JSONB column. Payload has a field which is array of records. When we use JsonConvertor we get InvalidRecordException

Caused by: org.apache.kafka.common.InvalidRecordException: Log record DefaultRecord(offset=0, timestamp=1733863040526, key=0 bytes, value=993 bytes) is rejected by the record interceptor io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator\n"}],“type”:“source”}

If we use AvroConvertor we get Not in Union Exception

Schema:

{
“type”:“record”,
“namespace”: “com.discover.fraud.card.account.activity”,
“name”:“AccountServicingAudit”,
“doc”: “This event send audit details”,
“fields”:[
{
“name”:“accountKey”,
“type”:“string”,
“doc”: “Account Key used while processing request.”
},
{
“name”:“transferSeqNumber”,
“type”:“string”,
“doc”: “Transfer Sequence Number used while processing request.”
},
{
“name”:“racfId”,
“type”:“string”,
“doc”: “Racf Id”
},
{
“name”:“timestamp”,
“type”:“string”,
“doc”: “Timestamp of error”
},
{
“name”:“changeType”,
“type”:“string”,
“doc”: “Change Type”
},
{
“name”: “changeDetails”,
“doc”: “List of fields/attributes updated”,
“type”: [
{
“type”: “array”,
“items”: [
{
“type”: “record”,
“name”: “changeDetails”,
“fields”: [
{
“name”: “name”,
“type”: “string”,
“doc”: “Name of the attribute modified”
},
{
“name”: “value”,
“type”: “string”,
“doc”: “Value of the attribute modified”
}
]
}
]
}
]
}
]
}

Payload:

{
“racfId”: “ABCD”,
“timestamp”: “2024-Jan-25 12:24:16.753”,
“accountKey”: “1234567890”,
“changeType”: “TEST”,
“changeDetails”: [
{
“name”: “Status”,
“value”: “R”
},
{
“name”: “4506TFlag”,
“value”: “102424”
},
{
“name”: “fcmsQueueName”,
“value”: “Outsourcing 13”
},
{
“name”: “orderStatus”,
“value”: “DIRECTREVOKE”
}
],
“transferSeqNumber”: “001”
}

Kafka Config

curl
-i -X PUT -H “Accept:application/json”
-H “Content-Type:application/json” http://localhost:8080/connectors/cboAudit_source_connector/config
-d ‘{
“connector.class”: “io.confluent.connect.jdbc.JdbcSourceConnector”,
“connection.url”: “URL”,
“connection.user”: “ID”,
“connection.password”: “PWD”,
“connection.attempts”: “3”,
“query.retry.attempts”: “3”,
“errors.log.enable”: “true”,
“errors.log.include.messages”: “true”,
“numeric.mapping”: “best_fit”,
“poll.interval.ms”: “5000”,
“topic.prefix”: “TOPIC”,
“mode”: “timestamp+incrementing”,
“timestamp.column.name”: “crte_tms”,
“incrementing.column.name”: “process_seq_nbr”,
“key.converter.latest.compatibility.strict”: “false”,
“key.converter.schemas.enable”: “true”,
“key.converter.use.latest.version”: “true”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.serializer”: “io.confluent.kafka.serializers.KafkaAvroSerializer”,
“value.converter.auto.register.schemas”: “false”,
“value.converter.latest.compatibility.strict”: “false”,
“value.converter.schema.registry.ssl.truststore.location”: “JKS”,
“value.converter.schema.registry.ssl.truststore.password”: “pswd”,
“value.converter.schema.registry.url”: “Registry-loc”,
“key.converter.schema.registry.url”: “Registry-loc”,
“value.converter.schemas.enable”: “true”,
“value.converter.use.latest.version”: “true”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.serializer”: “io.confluent.kafka.serializers.KafkaAvroSerializer”,
“query”:“SELECT * FROM (select FLX_PYLD_EVNT::json->> '”’“‘accountKey’”‘"’ as accountKey,FLX_PYLD_EVNT::json->> ‘"’“‘transferSeqNumber’”‘"’ AS transferSeqNumber,FLX_PYLD_EVNT::json->> ‘"’“‘racfId’”‘"’ as racfId,FLX_PYLD_EVNT::json->> ‘"’“‘timestamp’”‘"’ as timestamp,FLX_PYLD_EVNT::json->> ‘"’“‘changeType’”‘"’ as changeType,FLX_PYLD_EVNT::json-> ‘"’“‘changeDetails’”‘"’ as changeDetails,crte_tms,process_seq_nbr from CBO_PROCESS_ACTIVITY where FLUX_TPC_NM=‘"’“‘business.direct-banking.cross-product.fraud.cards.account.activity’”‘"’) AS AuditActivity"
}';