Static membership in Kafka Connect

0

I’m trying to implement the static membership in Kafka Connect. Our Kafka Connect cluster is deployed on K8S using the Strimzi Kafka operator.

I have tried putting the following config for the workers (in the KafkaConnect yaml):

connector.client.config.override.policy: All
consumer.group.instance.id: somethingsomething

And in the HttpSinkConnector class I have this:

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    for (int i = 0; i < maxTasks; i++) {
        Map<String, String> configCopy = new HashMap<>(this.configProps);
        configCopy.put("consumer.override.group.instance.id", Thread.currentThread().getName());

        configs.add(configCopy);
    }
    return configs;
}

This gave org.apache.kafka.common.errors.FencedInstanceIdException with some log - …07:07:32,631 ERROR [Consumer instanceId=somethingsomething… because all tasks got somethingsomething as their group.instance.id although it should have get Thread.currentThread().getName().

I have also tried just the following (without the the worker configuration):

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    for (int i = 0; i < maxTasks; i++) {
        Map<String, String> configCopy = new HashMap<>(this.configProps);
        configCopy.put("consumer.group.instance.id", "somethingsomething");

        configs.add(configCopy);
    }
    return configs;
}

And that did nothing (no errors, no instanceId in logs) which mean I put this config value in the wrong place.

So how can I achieve static membership on Kafka Connect?

@ommr101 can you further define your desired behavior? What specifically do you mean by “static membership”?

Hi @rick,
static membership is a built in feature in Kafka where you can give each consumer in a consumer group a unique id which will be kept during transient failures (such as machine restart or rolling restart) and by that the consumer will not send LeaveGroup Request and a rebalance will not be triggered.
(more info here - KIP-345: Introduce static membership protocol to reduce consumer rebalances - Apache Kafka - Apache Software Foundation).

Since Kafka Connect is just an abstraction on top of consumer group (where a connector is a consumer group and a task is a consumer), we theoretically should be able to implement the mentioned static membership simply by giving each task a unique id in its configuration.

That will allow us to skip redundant rebalances each rolling restart (or pod relocation in case of k8s deployment).

Eventually, I’m trying to pass to each task a unique configuration as described in the question but with out success…

@ommr101 since the actual consumer configuration is group.instance.id, have you tried removing the consumer and / or consumer.override bits from your config building routines? I’m trying to look at the AK / connector code to see the paths, but thought I’d suggest to see if it’s worth a try.

@rick yes I have tried but no success, it looked like the underlaying consumers could not pick up this configuration (group.instance.id).

And just so we’ll be on the same page, in that attempt I removed from the worker config the mentioned configs in the question.
(those configs:

connector.client.config.override.policy: All
consumer.group.instance.id: somethingsomething

)

@rick
HI, any updates?