How to flatten Kafka message in EventBridge Sink Connector so headers and payload aren't nested inside value?

Hi all,

I’m using the Amazon EventBridge Sink Connector for Kafka Connect to push messages into EventBridge.

My Kafka message looks like this:

{
“headers”: {
“country”: “US”,
“channel”: “REDACTED_CHANNEL”,
“processType”: “REDACTED_PROCESS_TYPE”
},
“payload”: {
“date”: “01/01/2025”,
“address”: “JOE ST 00000”,
“customerId”: “REDACTED_CUST_ID”,
“accountNumber”: “REDACTED_ACC_NUM”
}
}

But in EventBridge, it appears wrapped inside a value field like this:

“detail”: {
“topic”: “test_topic2”,
“partition”: “1”,
“offset”: “28”,
“timestamp”: “1749142570349”,
“timestampType”: “CreateTime”,
“key”: “18”,
“value”: {
“headers”: { … },
“payload”: { … }
}
}

What I want instead:

“detail”: {
“topic”: “test_topic2”,
“partition”: “1”,
“offset”: “28”,
“timestamp”: “1749142570349”,
“timestampType”: “CreateTime”,
“key”: “18”,
“headers”: { … },
“payload”: { … }
}

Connector Config:

{
“name”: “EventBridge_POC”,
“connector.class”: “software.amazon.event.kafkaconnector.EventBridgeSinkConnector”,
“topics”: “test_topic2”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“aws.eventbridge.source”: “gravity”,
“aws.eventbridge.detail.types”: “gravity.Undate”,

}

Is there a way to flatten the Kafka message or re-map the structure so that headers and payload aren’t inside value, but are instead top-level fields within the detail section in EventBridge?

Any help would be appreciated!

I think that you’d need to write a custom SMT to do this since that structure comes from the connector itself and doesn’t look to be configurable there (see here). The ExtractNestedField SMT here might work. Try that with input.inner.field.name = headers, input.outer.field.name = value, and output.field.name = headers. If that works then you’d want to specify it a second time for the nested payload field. I’m skeptical that this will work since I think you are technically trying to get fields two levels deep to one level deep within detail and this SMT looks like it only supports pulling a field nested one level up to the root. You could start from the source code here if that’s the case.