Hi, I’m using Debezium Mysql Source connector and Neo4j Sink connector that are working great together when creating nodes, but I’m facing some difficulties when updating and deleting nodes. I followed neo4j’s tutorial and shows only one example creating a node. What if I need to run a different cypher query when updating or deleting rows in mysql table? Here’s my config for the connectors
Mysql debezium
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "1",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"database.allowPublicKeyRetrieval":"true"
}
}
Neo4j :
{
"name": "Neo4jSinkConnector",
"config": {
"topics": "dbserver1.inventory.customers",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "neo4j+s://01f8dae8.databases.neo4j.io:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.topic.cypher.dbserver1.inventory.customers": "MERGE (p:Person{name: event.after.nombre, id: event.after.id})"
}
}