Stuck with sink error

hi hi all.

Hope someone can shed some light.

Please see attached.

the children-address sink works, getting address nodes created.
‘create_children_address_node_sink.json’ called by ‘create_node_sink_children_address.sh’

but the children themself is not being created.
as per ‘create_children_node_sink.json’ called by ‘create_node_sink_children.sh’
see attached, also attached is a copy of the payload in children.jsonG

Example of children payload

{
  "_id": "96d35eb2-dc7f-40df-8128-30c58b250692",
  "dob": "19/03/28",
  "name": "Shaun",
  "gender": "Male",
  "address": {
    "town": "Galway City",
    "county": "Galway",
    "country": "Ireland",
    "province": "Connacht",
    "street_1": "99 Fresh Street Street",
    "street_2": "",
    "parcel_id": "H91 Y9P7-22470",
    "postal_code": "H91 Y9P7",
    "country_code": "IE",
    "neighbourhood": "Salthill"
  },
  "surname": "Doudigan",
  "nationalid": "0002003P",
  "family_id": "4e6f3e02-91ac-42f9-a518-408e780a7c7b",
  "father_nationalid": "7934317B",
  "mother_nationalid": "0181947G"
}

create_children_node_sink.json

{
  "name": "neo4j-children-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "children",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.children": "MERGE (t:Children {nationalid: event.nationalid,}) ON CREATE SET t += {nationalid: event.nationalid, _id: event._id, name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id} ON MATCH SET t += {name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

create_node_sink_children.sh

#!/bin/bash

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_children_node_sink.json

curl -s http://localhost:8083/connectors/neo4j-children-node-sink/status | jq '.'

create_children_address_node_sink.json

{
  "name": "neo4j-children-address-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "children",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.children": "MERGE (t:Address {parcel_id: event.address.parcel_id}) ON CREATE SET t += {parcel_id: event.address.parcel_id, street_1: event.address.street_1, street_2: event.address.street_2, town: event.address.town, county: event.address.county, province: event.address.province, country: event.address.country, postal_code: event.address.postal_code, country_code: event.address.country_code, neighbourhood: event.address.neighbourhood} ON MATCH SET t += { street_1: event.address.street_1, street_2: event.address.street_2, town: event.address.town, county: event.address.county, province: event.address.province, country: event.address.country, postal_code: event.address.postal_code, country_code: event.address.country_code, neighbourhood: event.address.neighbourhood}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

create_node_sink_children_address.sh

#!/bin/bash


curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_children_address_node_sink.json


curl -s http://localhost:8083/connectors/neo4j-children-address-node-sink/status | jq '.'

err.txt

I can’t paste the error, to long, also does not want me to upload a txt file or a log file, so the above was the only option.

This is all is on Flink 1.20.2 scala 2.12 with Java 17.

docker logs -f connect |grep error

...

[2025-09-27 11:25:00,744] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 1 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2025-09-30 13:17:10,031] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 2 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=75, connectorIds=[neo4j-children-node-sink], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
	errors.deadletterqueue.context.headers.enable = false
	errors.deadletterqueue.topic.name = 
	errors.deadletterqueue.topic.replication.factor = 3
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	errors.deadletterqueue.context.headers.enable = false
	errors.deadletterqueue.topic.name = 
	errors.deadletterqueue.topic.replication.factor = 3
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	errors.deadletterqueue.context.headers.enable = false
	errors.deadletterqueue.topic.name = 
	errors.deadletterqueue.topic.replication.factor = 3
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	errors.deadletterqueue.context.headers.enable = false
	errors.deadletterqueue.topic.name = 
	errors.deadletterqueue.topic.replication.factor = 3
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
[2025-09-30 13:17:10,089] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=78, connectorIds=[neo4j-children-node-sink], taskIds=[neo4j-children-node-sink-0, neo4j-children-node-sink-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
...
[2025-09-30 13:28:02,553] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 4 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=81, connectorIds=[], taskIds=[], revokedConnectorIds=[neo4j-children-node-sink], revokedTaskIds=[neo4j-children-node-sink-0, neo4j-children-node-sink-1], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2025-09-30 13:28:02,556] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 5 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=81, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2025-09-30 13:28:09,797] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 6 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=82, connectorIds=[neo4j-children-node-sink], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
...
[2025-09-30 13:28:09,809] INFO [Worker clientId=connect-connect:8083, groupId=compose-connect-group] Joined group at generation 7 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-connect:8083-36254c8f-995b-474f-bde0-0d420b8a3371', leaderUrl='http://connect:8083/', offset=85, connectorIds=[neo4j-children-node-sink], taskIds=[neo4j-children-node-sink-0, neo4j-children-node-sink-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

hi @georgelza

so no errors on kafka connect side right?

anything on the neo4j side?

Nothing I can see other than the sink status in failed status.

other than me missing a log entry/file…

G

I see

could you repost the error from status?

best,

michael

See attached.

Happy to pull more information if you can point me at whats needed.

G

(Attachment err.txt is missing)

hmmm, log and txt files being rejected.

attaching the contents rejected due to length.

going to send to you direct.

G

hmmm, pls explain “from status” I posted the docker compose logs output grepping for connect.

G

looking for the output from

curl -s ``http://localhost:8083/connectors/neo4j-children-address-node-sink/status`` | jq '.'

so children address is working, see below.

curl -s ``http://localhost:8083/connectors/neo4j-children-address-node-sink/status`` | jq '.'
{
  "name": "neo4j-children-address-node-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    }
  ],
  "type": "sink"
}

it’s the children sink thats failing, below.

curl -s ``http://localhost:8083/connectors/neo4j-children-node-sink/status`` | jq '.'
{
  "name": "neo4j-children-node-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '}': expected an identifier (line 1, column 235 (offset: 234))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * MERGE (t:Children {nationalid: event.nationalid,}) ON CREATE SET t += {nationalid: event.nationalid, _id: event._id, name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id} ON MATCH SET t += {name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id}} RETURN NULL\"\n                                                                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"
    },
    {
      "id": 1,
      "state": "FAILED",
      "worker_id": "connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '}': expected an identifier (line 1, column 235 (offset: 234))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * MERGE (t:Children {nationalid: event.nationalid,}) ON CREATE SET t += {nationalid: event.nationalid, _id: event._id, name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id} ON MATCH SET t += {name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, family_id: event.family_id, father_nationalid: event.father_nationalid, mother_nationalid: event.mother_nationalid, parcel_id: event.address.parcel_id}} RETURN NULL\"\n                                                                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"
    }
  ],
  "type": "sink"
}

I’m not that much in neo4j though the obvious error seems to be:

(t:Children {nationalid: event.nationalid,})

the “,” after looks a bit suspicious to me

best,

michael

:collision: BINGO….

Thank you, that was exactly it…

Now onto the other challenges for this sink… into Neo4J. picking out an array of records and inserting them as separate nodes.

G

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.