Kafka connect with k8s config

Hi, we are running kafka connect on top of k8s for CDC use case. We have a 60 sec end to end freshness SLO, and we would like to have some guidance on the setup.

  • Rebalance delay
    There is a 5 min rebalance delay controlled by scheduled.rebalance.max.delay.ms, this means we will see at least 5 min lag in freshness when a pod get recycled by K8S. We try to set it to 30 sec - it seems we will take a few sec lag everytime we trigger a rebalance with incremental rebalance. Wonder if we missed any reason that default value is set to 5 min?

  • Rolling restart
    During a deployment, we are using rolling restart, we want to make sure the previous worker starts taking task before the next pod get restarted. So that we make sure only the tasks that are assigned to one worker are being rebalanced. Is there an endpoint on connect that can be used to integrate with health probe? Or if there is any other suggestions to deal with the rolling restart like this to minimize the end to end freshness?

Thanks!

Are you using Confluent for Kubernetes, or Strimzi?

We are using Confluent’s docker image Docker

And launch it via confluent/docker/launch → connect-distributed on k8s

Hi @ywu-stripe

could provide some more details about your env?
how does you setup in k8s look like?

1 Like

Happy to share - is there anything in particular that you are looking for?

I don’t think we have some special setup on k8s, there are two cases that a pod will get recycled: 1/ a pod is unhealthy 2/ we periodically recycle pods that have been running for more than X days

basically how you’ve started the whole stack and the config :slight_smile:

This is our setup:

        container_env_vars({
            "CONNECT_BOOTSTRAP_SERVERS": BOOTSTRAP_SERVERS_BY_ENV.get(get_env(ctx)),
            "CONNECT_REST_PORT": str(port),
            "CONNECT_GROUP_ID": datastore.replace(".", "-"),
            "CONNECT_CONFIG_STORAGE_TOPIC": get_internal_topic(ctx, datastore, "__cluster_configs"),
            "CONNECT_OFFSET_STORAGE_TOPIC": get_internal_topic(ctx, datastore, "__cluster_offsets"),
            "CONNECT_STATUS_STORAGE_TOPIC": get_internal_topic(ctx, datastore, "__cluster_status"),
            "CONNECT_KEY_CONVERTER": json_converter,
            "CONNECT_VALUE_CONVERTER": json_converter,
            "CONNECT_INTERNAL_KEY_CONVERTER": json_converter,
            "CONNECT_INTERNAL_VALUE_CONVERTER": json_converter,
            "CONNECT_PLUGIN_PATH": "/usr/share/java",
            "CONNECT_LOG4J_LOGGERS": ",".join(CONNECT_LOG4J_LOGGERS),
            "CONNECT_SCHEDULED_REBALANCE_MAX_DELAY_MS": "25000",
            "JMX_PORT": str(jmx_port),
            "KAFKA_HEAP_OPTS": "-Xms{} -Xmx{}".format(java_heap_size, java_heap_size),
        }, from_fields = {
            "CONNECT_REST_ADVERTISED_HOST_NAME": "status.podIP",
        }, container_name = name),
        run_as_unprivileged(),
        container_port(port),
        healthchecked_service(
            name = name,
            port = port,
            path = "/",
        ),
        probes(
            readiness = http_probe(
                path = "/",
                port = port,
                successThreshold = 1,
                failureThreshold = 3
            ),
        ),
        strategy = rolling_update_strategy(
            max_unavailable = 1,
            max_surge = 0,
        ),

thanks,
could you also provide some insights to your k8s setup?

best,
michael

Sorry for the noob question, but I’m not super familiar with the k8s setup :sweat_smile:

I think we don’t have any special config for k8s… is there any kubectl that I can use to get what might be useful or any hint where should I look for?

Thanks.

ah sorry for the non specific question
not looking for the k8s setup though for the whole kafka setup in k8s :wink:

best,
michael

Ah our Kafka is not running on k8s but dedicated host, only the connect cluster is running on k8s.

Here is the config of kafka topic if that helps:

$ ./kafka-configs.sh --bootstrap-server $broker:9094 --describe --entity-type topics --entity-name test.__cluster_configs
Dynamic configs for topic test.__cluster_configs are:
  min.insync.replicas=2 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:min.insync.replicas=2, STATIC_BROKER_CONFIG:min.insync.replicas=2, DEFAULT_CONFIG:min.insync.replicas=1}
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  retention.ms=-1 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=-1}

$./kafka-configs.sh --bootstrap-server kafkacdc-northwest-green.service.qa-northwest.consul:9094 --describe --entity-type topics --entity-name test.__cluster_status
Dynamic configs for topic test.__cluster_status are:
  min.insync.replicas=2 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:min.insync.replicas=2, STATIC_BROKER_CONFIG:min.insync.replicas=2, DEFAULT_CONFIG:min.insync.replicas=1}
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  retention.ms=-1 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=-1}

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.