What is the actual maximum value for max.tasks property?

If I have a jdbc sink connector, which consumes data from 10 topics, each topic with 6 partitions, what is the maximum value I can set for the max.tasks property, so that each task actually contributes to the data processing. Is it 6 (the highest number of partitions for a single topic), or 60 (the topics multiplied with partitions) ?

Hi @Radu,

Because the sink task uses a single consumer to poll the 10 topics, and because when there are multiple tasks they’ll belong to the same consumer group, the effective max in your example is 6.

You can verify this with kafka-consumer-groups. The consumer group created will have ID connect-<CONNECTOR_NAME>, e.g.:

kafka-consumer-groups --bootstrap-server <BOOTSTRAP> --describe --group connect-my-connector

This will print out partition assignments. The consumer and client IDs that get printed out include the task ID. The client ID that gets printed looks like connector-consumer-<CONNECTOR_NAME>-<TASK_ID> by default. If you want to map this to which workers are actually consuming, you can use the Connect REST API:

curl -s -H "Content-Type: application/json" -X GET <CONNECT_REST_ENDPOINT>/connectors/<CONNECTOR_NAME>/status | jq

This will give you task ID / worker ID pairs.

HTH,
Dave

So the only way to scale past this 6 tasks limit, is to split in multiple connectors ?
This is what I thought.

The concern I have though, is that if let’s say we start with a single connector that consumes from all the 10 topics, and later on you feel you need to scale past the 6 max tasks and need to split the connector in multiple ones, I found no way so that I can have the previous consumer groups for all of the newly splitted connectors, thus this triggering a recomputation of all the data.
This might impose some issue on data intensive flows.

Any better approach ?

Also any idea why it is not supported to have more tasks than 6, and rather split the 6 x 10 = 60 partition amongs the tasks ?

So the only way to scale past this 6 tasks limit, is to split in multiple connectors?

Yes, afaik multiple connectors is the only way to scale the number of active sink connector tasks in this scenario.

I found no way so that I can have the previous consumer groups for all of the newly splitted connectors, thus this triggering a recomputation of all the data.
This might impose some issue on data intensive flows.
Any better approach ?

If using idempotent writes then re-consumption sounds reasonable. i.e., I’d only rule it out if the performance impact from testing ruled it out.

One possibility – and I’d caveat this with a large ymmv you must test this caveat :slight_smile: – is that you can take the previous connector’s consumer group offsets and apply them to the new group that you get with the new connector name(s).

The process might look like:

  1. Stop the old connector
  2. Get all topic-partition offsets for the connector’s consumer group:
kafka-consumer-groups --bootstrap-server <BOOTSTRAP> --describe --group connect-<OLD_CONNECTOR_NAME>`
  1. Create consumer group connect-<NEW_CONNECTOR_NAME> with kafka-console-consumer, i.e., consume from the topic(s) that the new connector will consume from
  2. Update offsets of that group with the old connector’s offsets for every topic:partition pair:
kafka-consumer-groups --bootstrap-server <BOOTSTRAP> --group connect-<NEW_CONNECTOR_NAME>` --topic <TOPIC:PARTITION> --reset-offsets --to-offset <OLD CONNECTOR'S_OFFSET> --execute

And repeat steps 3 and 4 for each new connector instance.

I’m not certain about using the console consumer in step 3… it should work, but if I’m missing something another option would be to run the new connector for a bit just to get the consumer group created. Then stop it and update offsets.

If you try this, please report back here on how it worked.

Also any idea why it is not supported to have more tasks than 6, and rather split the 6 x 10 = 60 partition amongs the tasks ?

This is a design decision that likely goes way back. I don’t see anything in the Connect KIP but here’s my 2c…

I would guess that Connect didn’t go down this road because it’d have to end up solving a bunch of problems that consumer groups solve. To get scaling up to # topic-partitions, you’d have to coordinate topic-partition assignments and use KafkaConsumer.assign, which would sacrifice consumer group management. Connect would then have to solve for handling new tasks or partitioning updates and reinvent consumer group management. That’s a big lift – there’s an obvious benefit but it was likely deemed too costly to buy if it was considered.

Dave

Make sense, indeed since the number of partitions allocated to each consumer would be not uniform and will depend on how many taks there will be, consumer group management provided by kafka should be overriden indeed.

Great points, thanks for all the SUPPORT ! Really appreciate it !

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.