Is there any configuration available for existing enterprise connectors to collate the data (coming from source and before going to kafka topic) into small batches(technically Microbatches

Problem Statement : Need to do some reconciliation for streaming data received via Confluent Kakfa. for this my idea was to create Micro batch id for streaming data before they are being pushed to Kafka topics and get metadata separately from source. So is there any configuration available for existing connectors to add some Micro batch id to the messages to tag them based on certain duration. so that I can use them against metadata being received from Source.

Or do we have to go with custom code for producers and consumers ?

There isn’t a connector that I know that will satisfy your need.

I’ve worked with folks to make this happen. Connect isn’t really the place to do this. Connects ability to do logic isn’t as full featured as a stream processor. It’s better to have a separate stream processor for reconciliation.

Your other option, is to modify the source connector to produce the IDs that you need, and then have a separate task read those and implement your logic.

Thanks @mitchell-h for your response.

When you say modify the source connector to produce the ids. Is there any way to do that. I couldn’t find any configurations for the same where we can modify. If you provide some examples how do we do that that would be really great. I want a way so that in lets say for source connector for Salesforce where a set of messages can be batched together and can be tagged with a microbatch id and etl timestamp.

that depends on the connector, some have the source code available some do not. Also some have a license that allows you to modify the source, some do not. Best bet is to check Home | Confluent Hub for the connector you want to modify and find out if the source is there and modify-able.


If I understand " add some Micro batch id to the messages to tag them based on certain duration" properly as “add some additional information to the value or key of the record” then I think there is no need to modify a connector.
In that case SMTs (Get started with Single Message Transforms for self-managed connectors | Confluent Documentation) can do the job for you.

for example by using Common Transformations — Kafka Connect Connectors 1.0 documentation - to be more specific TimestampNowField — Kafka Connect Connectors 1.0 documentation can be used to just add a timestamp to the message or to the key.
==> That would be part of your kafka connector configuration without any need to touch the code

If you do not find something ready2use for your purpose in the kafka ecosystem you can also write your own SMT quite easily →

If you want to learn more about SMTs I can recomment the " Twelve Days of SMT" series from rmoff - seems his blog lacks of tags … go to and search for it ^^