Greetings,
Beginner Problem:
I’m running a Kafka environment (machine #1) on a standalone machine and MySQL on a separate machine (machine #2). The Kafka environment is running well, with an Avro(ized) topic. My goal is to use a Connector to sink the data from the topic to the MySQL database.
However, when I attempt to start up the Connect instance on the database machine (#2) -pointed to the zookeeper socket of the Kafka environment (#1) -, I receive the following message:
WARN [AdminClient clientId=adminclient -1] Connection to node 0 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:776)
So, it appears to be looking for the broker on the localhost, rather than at machine #1.
----- CONNECT CLI -----
$ kafka/bin/connect-distributed.sh etc/schema-registry/connect-avro-distributed.properties
****** connect-avro-distributed.properties *****
#Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=ip_address_of_computer1:9092
----- Schema Registry CLI -----
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
***** Schema Registry API *****
sudo curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config
-H “Content-Type: application/json” -d ‘{
“connector.class” : “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url” : “jdbc:mysql://localhost:3306/test”,
“topics” : “demo_altered”,
“key.converter” : “org.apache.kafka.connect.storage.StringConverter”,
“connection.user” : “username”,
“connection.password” : “password”,
“auto.create” : true,
“auto.evolve” : true,
“insert.mode” : “upsert”,
“pk.mode” : “record_key”,
“pk.fields” : “MESSAGE_KEY”,
“transforms” : “keepSome,addSome”,
“transforms.keepSome.type” : “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.keepSome.whitelist” : “MESSAGE_KEY,FLIGHT,MODES,ALT,SPEED,HEADING”,
“transforms.addSome.type” : “org.apache.kafka.connect.transforms.InsertField$Value”,
“transforms.addSome.timestamp.field” : “RECORD_TS”,
“transforms.addSome.static.field” : “source”,
“transforms.addSome.static.value” : “demo_altered”
}’
Let me know if you need any further information.
There must be a configuration item I’m missing.
Thank you for your help.