Different kafka cluster and different connect cluster

Hi,

I have different kafka cluster having 4 broker and 5 zookeeper- in Production.
i have other kafka cluster with 3 host and on each host kafka and zookeeper are running and i am treating this as my kafka connect cluster.

I am running kafka connect in distributed mode , it is working fine , i am successfully doing sink task from kafka to mongo.

My question is that in distributed mode , i have specified num of task as -13, since kafka connect cluster has 3 workers so the task shoud be distributed across the workers, but this is not happening.

can anyone guide me Please. @rmoff @rick Please help.

@ranjan1302 can you clarify this further? -13 is confusing. Can you share the specific connector configuration (secrets redacted)?

max.tasks is a maximum number of connect tasks. For sink connectors you will still be bound by the number of partitions. Connect will create a number of tasks that is maximum of partition count and max.tasks.

max.tax=15 i have specified

@rick my distributed properties is below :slight_smile:
bootstrap.servers=Host:port ==> (Details from production kafka)
plugin.path=/app/kafka-manager-2.0.0.2/lib
group.id=kafka-connect-cluster-prod-1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=_connect-offsets-1
offset.storage.replication.factor=3
config.storage.topic=_connect-configs-1
config.storage.replication.factor=3
status.storage.topic=_connect-status-1
status.storage.replication.factor=3
#status.storage.partitions=5

offset.flush.interval.ms=10000

rest.port=8085
rest.advertised.host.name=Host1,host2,host3 ==> (Details from other kafka - using this as kafka connect)
rest.advertised.port=8085

#SSL
In this i am specifying all the ssl related details and it is working fine.

And @rick it is ruinning fine but i am expecting the task will be distributed across all the 3 workers.

$ curl localhost:8085/connectors/mongo-sink-distributed2/status |jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   852  100   852    0     0   3527      0 --:--:-- --:--:-- --:--:--  3550
{
  "name": "mongo-sink-distributed2",
  "connector": {
    "state": "RUNNING",
    "worker_id": "null:-1"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 2,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 3,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 4,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 5,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 6,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 7,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 8,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 9,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 10,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 11,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 12,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 13,
      "state": "RUNNING",
      "worker_id": "null:-1"
    },
    {
      "id": 14,
      "state": "RUNNING",
      "worker_id": "null:-1"
    }
  ],
  "type": "sink"
}

You see above worker_id is null.

I’m not certain, but the above seems to indicate that the configuration of the rest.host.name and / or rest.advertised.host.name is invalid which may cause the workers to be unable to communicate over http.

@rick i am mentioning the - rest.advertised.host.name = all the 3 host of kafka connect cluster separated by coma.

@rmoff can you help here. Please sir.

yes, this is confusing, please clarify. The configuration should be the hostname you want each worker to advertise as the host upon which other workers connect.

ok, can you just let me know /suggest. i have 15 task for a sink connector. I want to distribute it to all the worker available in kafka connect cluster ? how i can do that.?

image

@rick , I have a connector task- sink connector with 3 or 15 or any number of task. How i can distribute the task to all the 4 workers ?

@rick , i resolved the issue with continuous efforts. Thanks for your all response. Much appreciated.

What i did ?
Ans:
Since i have 3 worker node , so i used 2 out of 3. Placed the worker.properties (distributed property file ) on both the node and i added the parameter :slight_smile:

on worker 1 → rest.advertised.host.name= worker1(node1 name)
on worker 2 → rest.advertised.host.name= worker2(node 2 name)

after that i started the worker.properties on both the worker and then started the sink task with taxk.max=15.

So i could see now out of 15 task few task running on worker 1 and few running on worker 2. like below:

{
“id”: 2,
“state”: “RUNNING”,
“worker_id”: “Node2:8085”
},
{
“id”: 3,
“state”: “RUNNING”,
“worker_id”: “Node1:8085”
},
…
…

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.