Sink json input file

Hi all

Anyone have an idea if the command thats executed against the target can be pushed in as a json file… The “command” thats contained in “neo4j.cypher.topic.adults" becomes very very long and it would actually be nice if I could extract it into a separate file where I can format it “nicer”.

{
  "name": "neo4j-adults-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "adults",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.adults": "MERGE (t:Adults {nationalid: event.nationalid}) ON CREATE SET t += {nationalid: event.nationalid, _id: event._id, name: event.name, surname: event.surname, gender: event.gender, dob: event.dob, marital_status: event.marital_status, partner: event.partner, status: event.status, parcel_id: event.address.parcel_id}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

Check out Kafka Connect’s FileConfigProvider to do this. It’s not quite what you want because it requires a properties file rather than JSON file contents substitution but it might work well for you to clean things up.

Make sure the provider is enabled, e.g., in Docker add these Connect configs:

CONNECT_CONFIG_PROVIDERS: file
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: org.apache.kafka.common.config.provider.FileConfigProvider

Then your connector config would have:

"neo4j.cypher.topic.adults": "${file:/path/to/connector.properties:neo4j.cypher.topic.adults}"

And then I think /path/to/connector.properties would have to use backslashes to get nice formatting:

neo4j.cypher.topic.adults=\
MERGE (t:Adults { \
  nationalid: event.nationalid \
}) \
  ON CREATE  SET t += { \
...

1 Like

going to try it… but hate the line continue “\”.

As it’s know that this is a merge statement, why could the coders not have not demanded “\” :frowning: it makes nice code look non nice, hhehee

any chance you can point me to the doc where you found this.

G

There are a couple of docs pages here (Apache Kafka docs) and here (Confluent).

This feature landed in KIP-297. It’s Java Properties-based, so the backslashes that I’m suggesting (warning - I haven’t tested it :slight_smile: ) come from the fact that that’s how you get a multi-line property in a Properties file.

I see there’s also an EnvVarConfigProvider from KIP-887 that you can try. Try putting the merge statement in a file with no backslashes, set an environment variable NEO4J_CYPHER to $(cat /path/to/file.txt), and in connector config:

"neo4j.cypher.topic.adults": "${envVarProvider:NEO4J_CYPHER}"

thank you.

will try and let you know what worked.

G

Hi there

Liked this. See below:

– create_children_node_sink.sh

#!/bin/bash

# Neo4j Kafka Connect Sink Configurations.
# =============================================================================
# Children Nodes Sink
# =============================================================================
echo "Creating 'Children' nodes sink..."
export NEO4J_CYPHER=$(cat create_children_node_merge.json)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_children_node_sink.json

# =============================================================================
# STATUS CHECK COMMANDS
# =============================================================================
echo ""
echo "Checking connector status..."
echo "=========================="

echo "Children sink status:"
curl -s http://localhost:8083/connectors/neo4j-children-node-sink/status | jq '.'

– create_children_node_sink.json

{
  "name": "neo4j-children-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "children",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.children": "${envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

– create_children_node_merge.json

MERGE (t:Children {nationalid: event.nationalid}) 
  ON CREATE SET t += {
    nationalid: event.nationalid, 
    _id: event._id, 
    name: event.name, 
    surname: event.surname, 
    gender: event.gender, 
    dob: event.dob, 
    family_id: event.family_id, 
    father_nationalid: event.father_nationalid, 
    mother_nationalid: event.mother_nationalid, 
    parcel_id: event.address.parcel_id
} 
  ON MATCH SET t += {
    name: event.name, 
    surname: event.surname, 
    gender: event.gender, 
    dob: event.dob, 
    family_id: event.family_id, 
    father_nationalid: event.father_nationalid, 
    mother_nationalid: event.mother_nationalid, 
    parcel_id: event.address.parcel_id
}
1 Like

… now, wonder if the *_merge.json file can contain multiple merge statements

G

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.