Hi All,
I have created two Cosmos DB source connectors in a single Kafka connect instance:
1. create-user-cosmosdb-source-connector
This will fetch the record from Cosmos UserDB-->User
container and store it in the "create-user"
Kafka topic if the property OperationType is 1 else ignore it.
JSON config:
{
"name": "create-user-cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"connect.cosmos.task.poll.interval": "1000",
"connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
"connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
"connect.cosmos.databasename": "UserDb",
"connect.cosmos.containers.topicmap": "create_user#User",
"connect.cosmos.offset.useLatest": true,
"topic.creation.enable": "false",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"transforms": "createIncludeFilter,replacefield",
"transforms.createIncludeFilter.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.createIncludeFilter.filter.condition": "$.[?(@.OperationType == 1)]",
"transforms.createIncludeFilter.filter.type": "include",
"transforms.createIncludeFilter.missing.or.null.behavior": "fail",
"transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.replacefield.exclude": "_rid,_self,_etag,_attachements,_ts",
"transforms.replacefield.include": "id,Login,Email,Password,Name,Country,OperationType"
}
}
2. create-user-cosmosdb-source-connector
This will fetch the record from Cosmos UserDB-->User
container and store it in the "update_user"
Kafka topic if the property OperationType is 2 else ignore it.
JSON config:
{
"name": "update-user-cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "true",
"connect.cosmos.task.poll.interval": "1000",
"connect.cosmos.connection.endpoint": "https://cosmos-instance.documents.azure.com:443/",
"connect.cosmos.master.key": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==",
"connect.cosmos.databasename": "UserDb",
"connect.cosmos.containers.topicmap": "update_user#User",
"connect.cosmos.offset.useLatest": true,
"topic.creation.enable": "false",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"transforms": "updateIncludeFilter,replacefield",
"transforms.updateIncludeFilter.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.updateIncludeFilter.filter.condition": "$.[?(@.OperationType == 2)]",
"transforms.updateIncludeFilter.filter.type": "include",
"transforms.updateIncludeFilter.missing.or.null.behavior": "fail",
"transforms.replacefield.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.replacefield.exclude": "_rid,_self,_etag,_attachements,_ts",
"transforms.replacefield.include": "id,Login,Email,Password,Name,Country,OperationType"
}
}
But when running in parallel both use the same continuation token of the Cosmos lease container (i.e. User-leases). So the message is processed by only one connector whose worker-id is specified in the User-lease container.
Record present in the User-leases container:
{
"id": "UserUserDb.cosmos-instance.documents.azure.com_UserDb_User..0",
"_etag": "\"00000000-0000-0000-9c3b-3307afb701d8\"",
"LeaseToken": "0",
"ContinuationToken": "\"77\"",
"timestamp": "2022-07-20T13:18:28.053232Z",
"Owner": "worker-3971065-0",
"_rid": "9Y8kAO3czHkCAAAAAAAAAA==",
"_self": "dbs/9Y8kAA==/colls/9Y8kAO3czHk=/docs/9Y8kAO3czHkCAAAAAAAAAA==/",
"_attachments": "attachments/",
"_ts": 1658323108
}
How to have different records in the User-lease for each cosmos DB source connector? So that they will maintain their own ContinuationToken
and Owner (i.e. worker)
Thanks,
Saurabh.