Hi,
I have been working for a little with a kafka sink connector to fill up my timeseries collections.
the config is :
{
"name": "mongo-events-sink",
"config" : {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "2",
"topics": "mydb.events",
"connection.uri": "mongodb://writer:<passwd>@rs-1-1:27017/?directConnection=true&appName=mongosh+2.2.5",
"database": "mydb",
"collection": "events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "false",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"document.id.strategy.overwrite.existing": "true",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.InsertOneDefaultStrategy",
"delete.on.null.values": "false",
"timeseries.timefield": "_timestamp",
"timeseries.metafield": "_metadata",
"timeseries.timefield.auto.convert": true
}
}
my collection is created with :
db = db.getSiblingDB("mydb")
db.createCollection(
"mydb.events",
{
timeseries: {
timeField: "_timestamp",
metaField: "_metadata",
granularity: "seconds"
},
}
)
db.events.createIndex( { "_metadata.eventId": 1, "_timestamp": 1} )
db = db.getSiblingDB("admin")
db.grantRolesToUser("reader", [{"role":"read", "db": "mydb"}])
db.grantRolesToUser("writer", [{"role":"readWrite", "db": "mydb"}])
This configuration has been working lately but i had to restart my mongo RS and kafka instance to integrate new config and things have not been working since then (even though it feels like nothing has changed on the config side).
When i run:
curl -X POST -H "Content-Type: application/json" --data @config/mongo-mydb-events-sink-config.json http://localhost:8083/connectors -w "\n"
,
i get :
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nA collection already exists for:
mydb.eventsthat is not a timeseries collection.\nYou can also find the above list of errors at the endpoint
/connector-plugins/{connectorType}/config/validate"}
.