Hi all, I’m working to implement a kafka-connect CDC pipeline that uses Mongo source connectors and an Elasticsearch sink connector. I’m working with a containerized development environment, but will be moving to managed services for production. I’ve created the mongo source, and elasticsearch sink with what I’m hoping are basic configurations. The sink connector is behaving strangly. The first message through the broker is written to Elasticsearch as expected. The second message causes the connector to fail with null pointer exceptions, It may have failed at the end of the previous transaction, but the connector status doesn’t show any errors till the new message is sent. This is where things get weird, I can stack the topic up with more updates, and then delete/put the sink connector, and all the events on the topic are consumed and indexed as expected. Additional events casue the null pointer exception. My goal is to implement some transformations on the sink, to support future mongo-to-mongo pipelines. My questions are:
- Is this a typical use case (mongo → elasticsearch)
- Is the idea of transforms on sink to support future mongo->mongo pipelines sound
- I think I want schemaless json messages, is this possible? I understand the value of using schema, but I’d like to use a loose JSON Object schema in the local dev environment, and we can implement schema management when deployed.
- Any advice on how to tune the connector configs is greatly appreciated.
For reference, this is my source config
{
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"connection.uri": "mongodb://mongodb:27017/?replicaSet=rs0",
"database": "mentorHub",
"collection": "people",
"output.format.value": "json"
}
and this is the sink connector config
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "mentorHub.people",
"key.ignore": true,
"schema.ignore": true,
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"name": "sink-elasticsearch-people",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false
}