hi hi all.
Hope someone can shed some light.
Please see attached.
the children-address sink works, getting address nodes created.
‘create_children_address_node_sink.json’ called by ‘create_node_sink_children_address.sh’
but the children themself is not being created.
as per ‘create_children_node_sink.json’ called by ‘create_node_sink_children.sh’
see attached, also attached is a copy of the payload in children.jsonG
Example of children payload
{
"_id": "96d35eb2-dc7f-40df-8128-30c58b250692",
"dob": "19/03/28",
"name": "Shaun",
"gender": "Male",
"address": {
"town": "Galway City",
"county": "Galway",
"country": "Ireland",
"province": "Connacht",
"street_1": "99 Fresh Street Street",
"street_2": "",
"parcel_id": "H91 Y9P7-22470",
"postal_code": "H91 Y9P7",
"country_code": "IE",
"neighbourhood": "Salthill"
},
"surname": "Doudigan",
"nationalid": "0002003P",
"family_id": "4e6f3e02-91ac-42f9-a518-408e780a7c7b",
"father_nationalid": "7934317B",
"mother_nationalid": "0181947G"
}
create_children_node_sink.json
{
"name": "neo4j-children-node-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "children",
"neo4j.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.children": "MERGE (t:Children {nationalid: event.nationalid,}) ON CREATE SET t += {nationalid: event.nationalid, _id: event._id, name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id} ON MATCH SET t += {name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}
create_node_sink_children.sh
#!/bin/bash
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @create_children_node_sink.json
curl -s http://localhost:8083/connectors/neo4j-children-node-sink/status | jq '.'
create_children_address_node_sink.json
{
"name": "neo4j-children-address-node-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "children",
"neo4j.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.children": "MERGE (t:Address {parcel_id: event.address.parcel_id}) ON CREATE SET t += {parcel_id: event.address.parcel_id, street_1: event.address.street_1, street_2: event.address.street_2, town: event.address.town, county: event.address.county, province: event.address.province, country: event.address.country, postal_code: event.address.postal_code, country_code: event.address.country_code, neighbourhood: event.address.neighbourhood} ON MATCH SET t += { street_1: event.address.street_1, street_2: event.address.street_2, town: event.address.town, county: event.address.county, province: event.address.province, country: event.address.country, postal_code: event.address.postal_code, country_code: event.address.country_code, neighbourhood: event.address.neighbourhood}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}
create_node_sink_children_address.sh
#!/bin/bash
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @create_children_address_node_sink.json
curl -s http://localhost:8083/connectors/neo4j-children-address-node-sink/status | jq '.'
err.txt
I can’t paste the error, to long, also does not want me to upload a txt file or a log file, so the above was the only option.
This is all is on Flink 1.20.2 scala 2.12 with Java 17.
docker logs -f connect |grep error
...
[2025-09-27 11:25:00,744] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 1 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2025-09-30 13:17:10,031] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 2 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=75, connectorIds=[neo4j-children-node-sink], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
[2025-09-30 13:17:10,089] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=78, connectorIds=[neo4j-children-node-sink], taskIds=[neo4j-children-node-sink-0, neo4j-children-node-sink-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
...
[2025-09-30 13:28:02,553] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=81, connectorIds=[], taskIds=[], revokedConnectorIds=[neo4j-children-node-sink], revokedTaskIds=[neo4j-children-node-sink-0, neo4j-children-node-sink-1], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2025-09-30 13:28:02,556] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=81, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2025-09-30 13:28:09,797] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 6 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=82, connectorIds=[neo4j-children-node-sink], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
...
[2025-09-30 13:28:09,809] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 7 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=85, connectorIds=[neo4j-children-node-sink], taskIds=[neo4j-children-node-sink-0, neo4j-children-node-sink-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)