How to run multiple cosmos source connectors for same Container with different transform configurations in Kafka connect

Hi All,

I have created two Cosmos DB source connectors in a single Kafka connect instance:

1. create-user-cosmosdb-source-connector
This will fetch the record from Cosmos UserDB-->User container and store it in the "create-user" Kafka topic if the property OperationType is 1 else ignore it.

JSON config:

{
  "name": "create-user-cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true",
    "connect.cosmos.task.poll.interval": "1000",
    "connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "UserDb",
    "connect.cosmos.containers.topicmap": "create_user#User",
    "connect.cosmos.offset.useLatest": true,
    "topic.creation.enable": "false",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "transforms": "createIncludeFilter,replacefield",
    "transforms.createIncludeFilter.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.createIncludeFilter.filter.condition": "$.[?(@.OperationType == 1)]",
    "transforms.createIncludeFilter.filter.type": "include", 
    "transforms.createIncludeFilter.missing.or.null.behavior": "fail",
    "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.replacefield.exclude": "_rid,_self,_etag,_attachements,_ts",
    "transforms.replacefield.include": "id,Login,Email,Password,Name,Country,OperationType"
  }
}

2. create-user-cosmosdb-source-connector
This will fetch the record from Cosmos UserDB-->User container and store it in the "update_user" Kafka topic if the property OperationType is 2 else ignore it.

JSON config:

{
  "name": "update-user-cosmosdb-source-connector",
  "config": {
    "connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true",
    "connect.cosmos.task.poll.interval": "1000",
    "connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
    "connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
    "connect.cosmos.databasename": "UserDb",
    "connect.cosmos.containers.topicmap": "update_user#User",
    "connect.cosmos.offset.useLatest": true,
    "topic.creation.enable": "false",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "transforms": "updateIncludeFilter,replacefield",
    "transforms.updateIncludeFilter.type": "io.confluent.connect.transforms.Filter$Value",
    "transforms.updateIncludeFilter.filter.condition": "$.[?(@.OperationType == 2)]",
    "transforms.updateIncludeFilter.filter.type": "include", 
    "transforms.updateIncludeFilter.missing.or.null.behavior": "fail",
    "transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.replacefield.exclude": "_rid,_self,_etag,_attachements,_ts",
    "transforms.replacefield.include": "id,Login,Email,Password,Name,Country,OperationType"
  }
}

But when running in parallel both use the same continuation token of the Cosmos lease container (i.e. User-leases). So the message is processed by only one connector whose worker-id is specified in the User-lease container.

Record present in the User-leases container:

{
    "id": "UserUserDb.cosmos-instance.documents.azure.com_UserDb_User..0",
    "_etag": "\"00000000-0000-0000-9c3b-3307afb701d8\"",
    "LeaseToken": "0",
    "ContinuationToken": "\"77\"",
    "timestamp": "2022-07-20T13:18:28.053232Z",
    "Owner": "worker-3971065-0",
    "_rid": "9Y8kAO3czHkCAAAAAAAAAA==",
    "_self": "dbs/9Y8kAA==/colls/9Y8kAO3czHk=/docs/9Y8kAO3czHkCAAAAAAAAAA==/",
    "_attachments": "attachments/",
    "_ts": 1658323108
}

How to have different records in the User-lease for each cosmos DB source connector? So that they will maintain their own ContinuationToken and Owner (i.e. worker)

Thanks,
Saurabh.

Hi Team,

Please, reply to us if anyone has a solution for our problem. Or any other way we can achieve our requirement?
We want to store Create and Update events of Cosmos DB in different Kafka topics using connect.

both use the same continuation token of the Cosmos lease container (i.e. User-leases)

Open an issue - Issues · microsoft/kafka-connect-cosmosdb · GitHub