Hi guys, need some help.
Attaching my docker-compose file that spins the environment up.
Just seeing it fail in console, not getting anything really in logs if I grep for connect |grep for account
I have 3 of these sinks to get configured, thinking once I got the first worked the others will follow.
Please help…
I have 2 topics:
ob_account_holders
ib_account_holders
wanting to use the following cypher
"neo4j.topic.cypher.ob_account_holders": "CREATE (a:AccountHolder {accountEntityId: event.accountEntityId, bicfi: event.bicfi, accountId: event.accountId, tenantId: event.tenantId, accountAgentId: event.accountAgentId, fullName: event.fullName}) ON DUPLICATE KEY IGNORE",
"neo4j.topic.cypher.ib_account_holders": "CREATE (a:AccountHolder {accountEntityId: event.accountEntityId, bicfi: event.bicfi, accountId: event.accountId, tenantId: event.tenantId, accountAgentId: event.accountAgentId, fullName: event.fullName}) ON DUPLICATE KEY IGNORE",
note: the connector.class I extracted using
curl -X GET http://localhost:8083/connector-plugins | jq .
plan was to use the following sink config
{
"name": "neo4j-accountHolder-node-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "ob_account_holders,ib_account_holders",
"neo4j.uri": "bolt://localhost:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.topic.cypher.ob_account_holders": "CREATE (a:AccountHolder {accountEntityId: event.accountEntityId, bicfi: event.bicfi, accountId: event.accountId, tenantId: event.tenantId, accountAgentId: event.accountAgentId, fullName: event.fullName}) ON DUPLICATE KEY IGNORE",
"neo4j.topic.cypher.ib_account_holders": "CREATE (a:AccountHolder {accountEntityId: event.accountEntityId, bicfi: event.bicfi, accountId: event.accountId, tenantId: event.tenantId, accountAgentId: event.accountAgentId, fullName: event.fullName}) ON DUPLICATE KEY IGNORE",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}
docker-compose.yml
services:
# begin Confluent Kafka cluster
broker:
image: confluentinc/cp-kafka:7.9.1
container_name: broker
hostname: broker
ports:
- 9092:9092
- 9101:9101
environment:
KAFKA_NODE_ID: 1001
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29193,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: "1001@broker:29193"
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: /var/lib/kafka/data
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: ${CLUSTER_ID}
volumes:
- ./data/confluent.d/broker/data:/var/lib/kafka/data
- ./data/confluent.d/broker/log4j:/var/log/kafka
schema-registry:
image: confluentinc/cp-schema-registry:7.9.1
container_name: schema-registry
hostname: schema-registry
depends_on:
- broker
ports:
- 9081:9081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:9081
control-center:
image: confluentinc/cp-enterprise-control-center:7.9.1
container_name: control-center
hostname: control-center
depends_on:
- broker
ports:
- 9021:9021 # -> Web UI console
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:9081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
connect:
# build:
# context: .
# dockerfile: connect/Dockerfile
image: ${REPO_NAME}/kafka-connect-custom:2.1
container_name: connect
hostname: connect
depends_on:
- broker
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:9081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.6.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
# end Clonfluent cluster
neo4j:
container_name: neo4j
hostname: neo4j
image: neo4j:latest
ports:
- 7474:7474 # Console/Web Interface
- 7687:7687
environment:
- NEO4J_AUTH=${NEO4J_USERNAME}/${NEO4J_PASSWORD}
- NEO4J_apoc_export_file_enabled=true
- NEO4J_apoc_import_file_enabled=true
- NEO4J_apoc_import_file_use__neo4j__config=true
- NEO4J_PLUGINS=["apoc", "graph-data-science"]
volumes:
- ./data/neo4j_db/data:/data
- ./data/neo4j_db/logs:/logs
- ./data/neo4j_db/plugins:/plugins
- ./data/neo4j_data:/var/lib/neo4j/import
healthcheck:
test: ["CMD-SHELL", "wget --no-verbose --tries=1 --spider http://localhost:7474 || exit 1"]
interval: 5s
timeout: 5s
retries: 5
cypher-importer:
container_name: cypher-importer
hostname: cypher-importer
image: neo4j/neo4j-admin:latest
environment:
NEO4J_URI: bolt://neo4j:7687
NEO4J_USERNAME: ${NEO4J_USERNAME}
NEO4J_PASSWORD: ${NEO4J_PASSWORD}
volumes:
- ./data/neo4j_data/import.cypher:/var/lib/neo4j/import/import.cypher
depends_on:
neo4j:
condition: service_healthy
command: ["/bin/bash", "-c", "sleep 10 && cypher-shell -u ${NEO4J_USERNAME} -p ${NEO4J_PASSWORD} -f /var/lib/neo4j/import/import.cypher"]
# Without a network explicitly defined, you hit this Hive/Thrift error
# java.net.URISyntaxException Illegal character in hostname
# https://github.com/TrivadisPF/platys-modern-data-platform/issues/231
networks:
default:
name: ${COMPOSE_PROJECT_NAME}
connector.class output
curl -X GET http://localhost:8083/connector-plugins |jq .
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 690 100 690 0 0 125k 0 --:--:-- --:--:-- --:--:-- 134k
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.8.0"
},
{
"class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"type": "sink",
"version": "5.1.13"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.8.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "7.7.1-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "7.7.1-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "7.7.1-ccs"
},
{
"class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
"type": "source",
"version": "5.1.13"
}
]