Thank you for your help.
I’m running a Kafka Node (computer #1) and Connect (Sink) on a separate machine (computer #2).
On computer #2 the following is up and running:
- Connect (“connect-avro-distributed.properties”)
- Schema Registry (“schema-registry.properties”)
- KSQLDB
- KSQL CLI
- MySQL w/CLI
Now, that I have Connect running and acknowledged by the Broker (computer #1) as a member of the group, when I attempt to create a connector in the schema-registry I get the following error in the CLI:
{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}
The following message occurs in the Connect log:
INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:372)
I found @rmoff blog on the topic:
So, I altered the rest.advertised.host.name
in my connect-avro-distributed.properties
(see below). I’m still getting the error.
- Did I alter the correct properties file (
connect-avro-distributed.properties
)? - Did I misunderstand? Is the
rest.advertised.host.name
suppose to be the IP Address of computer #2 (Connect)? See below properties. - I’m wondering if the conflict is with a configuration on the Broker (computer #1)?
The Following are my configurations for computer #2 properties:
---- Computer #2 Properties (CONNECT) -----
***** schema-registry.properties *****
listeners=http://0.0.0.0:8081
kafkastore.connection.url=ip.address.computer2:2181
# kafkastore.bootstrap.servers=ip.address.computer2:9092 (COMMENTED OUT)
kafkastore.topic=_schemas
debug=false
***** connect-avro-distributed.properties *****
bootstrap.servers=ip.address.computer1:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.host.name=ip.address.computer2
rest.port=8083
rest.advertised.host.name=ip.address.computer2
rest.advertised.port=8083
plugin.path=share/java,…
***** Connector Configuration *****
sudo curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
-H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://localhost:3306/test",
"topics" : "demo_altered",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"connection.user" : “username”,
"connection.password" : “password”,
"auto.create" : true,
"auto.evolve" : true,
"insert.mode" : "upsert",
"pk.mode" : "record_key",
"pk.fields" : "MESSAGE_KEY"
}'
Thank you for your help.