Unable to pause or resume connectors

Hello folks,

We recently scaled down the number of kafka-connect workers from 3 to just 1. Since then we have been unable to pause or resume connectors using the REST API.

Pause/resume requests to the REST API complete successfully and return an http status code of 202, but the status of the connectors does not change and nor do we see any relevant messages in connect worker logs, connect configs topic and the connect status topic. It is as if the requests to pause/resume connectors were never sent at all. I tried deleting the connect worker container and re-creating it but that did not help.

Connect worker image: confluentinc/cp-kafka-connect:6.2.0

I’d highly appreciate any input on this and potentially a way to fix this issue.

Regards,
Shiva

After a few more attempts to fix this issue, here’s a summary of what we found.

Conclusion: Kafka Connect REST API is broken after scaling down connect workers

WORKING:

  • GET requests to verify status of connectors

NOT WORKING:

  • PUT requests to pause or resume connectors
    • Requests complete successfully - return status code 202
    • Nothing in kafka connect logs about receiving pause /resume requests
  • POST requests to delete or create new connector
    • Requests fail - return status code 500
    • Nothing in kafka connect logs about the failed requests

We decided to re-do our connectors from scratch as there doesn’t seem to be any other way to recover from this.

-Shiva

This does sound strange. Perhaps too late now but it might be worth double-checking against Common mistakes made when configuring multiple Kafka Connect workers in case that could cause this behaviour.

@shiva.lingala can you post your configurations (secrets redacted) so we can try and reproduce?

Thanks

A test was ran by starting up three connect workers on cp-kafka-connect:6.2.0. Creating a connector, pausing it, deleting two of the worker containers, then resuming the connector.

The connector’s status was not immediately updated in the rest api (not only was it still paused, it was still assigned to a now-deleted worker), which seemed to align with the your findings. However, after the scheduled.rebalance.max.delay.mshad elapsed, the connector was reassigned to the only-remaining worker and its status was updated to RUNNING.

Ran the same repro scenario again but with CONNECT_SCHEDULED_REBALANCE_MAX_DELAY_MS set to 0 , and there was a much smaller delay between the worker dying and the connector and its tasks being reassigned to a new worker (less than ten seconds), and no noticeable delay between the PUT /connectors/{connector}/resume REST request returning and the connector being resumed.

In summary, looks like this is a consequence of the scheduled rebalance delay that prevents connectors and tasks from being automatically reassigned to new workers in case the worker they were running on will be brought back up again soon. This delay is configurable and you can reduce or even eliminate that delay by adjusting the CONNECT_SCHEDULED_REBALANCE_MAX_DELAY_MS environment variable for their connect workers.

1 Like

Hello Rick,

Thanks a lot for the information. I was not aware of the CONNECT_SCHEDULED_REBALANCE_MAX_DELAY_MS setting. I will surely use it going forward.

I think what we saw was a little more bizarre. Robin, I did follow your guide when setting up the connect workers. We are using kafka-connect to replicate data to our snowflake warehouse using debezium and snowflake connectors.

We started with a single instance of kafka-connect since the data is not too much(&budget too) but saw Out of Memory errors when running the snowflake connectors. So I added two more instances of connect workers, even though the whole setup is running on a single node and this did help with our of memory errors. I am not sure if this would cause such behavior.

The reason why we scaled down was, after everything running smoothly for 4 days around 10 of our connectors moved to ‘UNASSIGNED’ status and we could not do anything about it. Restarting the connectors did not help. I removed the extra connect instances, the connectors got assigned to the only remaining connect worker and all connectors moved back to either RUNNING or PAUSED statuses. This is when we noticed that we could not pause/resume connectors as well as delete or create new connectors, which prompted me to post the issue over here. We did wait for 2 days before deciding to pull the plug and reload all data, so the rebalance delay may not be the root of the problem.

Here’s the connect worker configuration I used.

connect-01:
    container_name: connect-01
    image: confluentinc/cp-kafka-connect:${KAFKA_VERSION}
    networks:
      - pipeline
    restart: unless-stopped
    depends_on:
      - kafka
      - zookeeper
    ports:
    - 8083:8083
    environment:
      CONNECT_GROUP_ID: 1
      CONNECT_REST_PORT: 8083
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-01
      CONNECT_STATUS_STORAGE_TOPIC: ConnectStatus
      CONNECT_OFFSET_STORAGE_TOPIC: ConnectOffsets
      CONNECT_CONFIG_STORAGE_TOPIC: ConnectConfigs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars/
      KAFKA_HEAP_OPTS: ${CONNECT_HEAP_SETTINGS}
    volumes:
      - $PWD/plugins/debezium/$DBZ_VERSION:/etc/kafka-connect/jars/debezium
      - $PWD/plugins/snowflake/$SF_VERSION:/etc/kafka-connect/jars/snowflake

  connect-02:
    container_name: connect-02
    image: confluentinc/cp-kafka-connect:${KAFKA_VERSION}
    networks:
      - pipeline
    restart: unless-stopped
    depends_on:
      - kafka
      - zookeeper
    ports:
    - 28083:8083
    environment:
      CONNECT_GROUP_ID: 1
      CONNECT_REST_PORT: 8083
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-02
      CONNECT_STATUS_STORAGE_TOPIC: ConnectStatus
      CONNECT_OFFSET_STORAGE_TOPIC: ConnectOffsets
      CONNECT_CONFIG_STORAGE_TOPIC: ConnectConfigs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars/
    volumes:
      - $PWD/plugins/debezium/$DBZ_VERSION:/etc/kafka-connect/jars/debezium
      - $PWD/plugins/snowflake/$SF_VERSION:/etc/kafka-connect/jars/snowflake

  connect-03:
    container_name: connect-03
    image: confluentinc/cp-kafka-connect:${KAFKA_VERSION}
    networks:
      - pipeline
    restart: unless-stopped
    depends_on:
      - kafka
      - zookeeper
    ports:
    - 38083:8083
    environment:
      CONNECT_GROUP_ID: 1
      CONNECT_REST_PORT: 8083
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect-03
      CONNECT_STATUS_STORAGE_TOPIC: ConnectStatus
      CONNECT_OFFSET_STORAGE_TOPIC: ConnectOffsets
      CONNECT_CONFIG_STORAGE_TOPIC: ConnectConfigs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars/
    volumes:
      - $PWD/plugins/debezium/$DBZ_VERSION:/etc/kafka-connect/jars/debezium
      - $PWD/plugins/snowflake/$SF_VERSION:/etc/kafka-connect/jars/snowflake

Let me know if you need any other information.

-Shiva