Only Struct objects supported for [copying fields from value to key]

RabbitMQSoruce connector throwing below error while setting the key from incoming nested json.

[2022-12-05 16:25:50,624] ERROR Error encountered in task RabbitMQSourceConnectorConnector_aerospike-0. Executing stage ‘TRANSFORMATION’ with class ‘org.apache.kafka.connect.transforms.ValueToKey’, where source record is = SourceRecord{sourcePartition={queues=padl.aerospikedatasyncDestPOC}, sourceOffset={deliveryTag=1}} ConnectRecord{topic=‘sandbox.com.schwab.preferences.padl.dev’, kafkaPartition=null, key=null, keySchema=Schema{STRING}, value=[B@245262b, valueSchema=Schema{BYTES}, timestamp=1670275550623, headers=ConnectHeaders(headers=[ConnectHeader(key=rabbitmq.consumer.tag, value=amq.ctag-2Z6cagGwDWpY6-y821_LbQ, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.content.type, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.content.encoding, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.delivery.mode, value=1, schema=Schema{INT32}), ConnectHeader(key=rabbitmq.priority, value=0, schema=Schema{INT32}), ConnectHeader(key=rabbitmq.correlation.id, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.reply.to, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.expiration, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.message.id, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.timestamp, value=null, schema=Schema{org.apache.kafka.connect.data.Timestamp:INT64}), ConnectHeader(key=rabbitmq.type, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.user.id, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.app.id, value=null, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.delivery.tag, value=1, schema=Schema{INT64}), ConnectHeader(key=rabbitmq.redeliver, value=true, schema=Schema{BOOLEAN}), ConnectHeader(key=rabbitmq.exchange, value=, schema=Schema{STRING}), ConnectHeader(key=rabbitmq.routing.key, value=padl.aerospikedatasyncDestPOC, schema=Schema{STRING})])}. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: [B
at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:81)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:346)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

The RabbitMQ source only supports strings or bytes values, and does not care you have JSON. It doesn’t parse the data, therefore doesn’t offer fields to copy into the key.

You’ll need to use Kafka Streams or ksqlDB to parse the value, then appropriately relocate fields from the value into a key

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.