Restart tasks from within a connector

I have a source connector that processes multiple source partitions by assigning them to a set of tasks. It is possible that a new partition is created in the source and the connector is capable of seeing this change. Based on the new set of partitions, I want to restart one task to schedule the new partition to it.

So far, I see that there is the Connector#reconfigure() method that is probably called by the Connect cluster once the connector configuration changes. Calling this method will stop and then start of the entire connector (I assume, including all its tasks).

Is it the right thing to do? Techically, the connector won’t reconfigure itself but will rather reconfigure its tasks. Are there any more proper means to achieve this behavior?

@morozov,

I’m assuming that the connector you have is something you wrote? Most of the behavior that you have described only applies if you are developing a connector — such as the ability to override the method reconfigure() of the connector. As a user of a connector your hands are tied and you are limited to the behavior that the connector provides OOTB.

If this is the case then yes, you can override the method reconfigure() which will give you the chance to implement some extra behavior in-between the stop/start the runtime does on the connector every time the connector settings are changed.

A better option — if I may — is the implementation of a monitor to your connector in a form of a background thread. This thread would continuously check for changes such as the presence (or lack thereof) of some partitions — which in turn should force your connector to reconfigure the number of tasks by calling the method requestTaskReconfiguration() of the provided context.

Last year I spoke in the London Java Community about development of custom connectors for Kafka Connect, and the recording of this talk can be watched below:

I made sure to get the link in the exact timestamp where I start talking about monitors, but feel free to watch the recording on its entirely if you must :slightly_smiling_face:

1 Like

If you are writing your own connector, then your SourceConnector subclass can monitor for new, changed or removed source partitions (likely in a separate thread; see below). If it detects any changes in source partitions, the connector then can request a “reconfiguration” of the connector’s tasks. This is the only way a connector can signal that Connect should call the connector’s taskConfigs(...) method again, stop all tasks, and restart the tasks as defined by the task configurations returned by the taskConfigs(...) call.

Your connector requests a reconfiguration by simply calling the requestTaskReconfiguration() method on the connector’s ConnectorContext object. If you can rely upon running in Kafka 2.6.0 or later, you can get the context via the SourceConnector.context() method; if you need to support your connector running in Kafka 2.5.x or earlier, then get it via the context protected field.

One thing to keep in mind is that the taskConfigs(...) method is called on the same SourceConnector instance that called the requestTaskReconfiguration() method, so any state held by the connector instance should be available when taskConfigs(...) is called on the same instance. (It’s true that the connector could be stopped and restarted for a variety of reasons, and this might interrupt the reconfiguration process. But in that case you sort of don’t care because upon restart presumably your connector upon startup would discover the current set of source partitions and taskConfigs(...) will be called before starting the tasks.)

Second, if your connector does start a thread to monitor the source system, make sure this thread is always stopped when the Connector#stop() method is called. One way to do this is to use a single thread executor pool to run the logic, and in Connector#stop() always call Executor#shutdown() or Executor#shutdownNow().

Connector developers don’t need to know how Connect actually makes the above happen across the Connect worker cluster, but just in case you’re curious it basically does the following:

  1. The worker in which the connector instance is running:
    a. loads the connector configuration from the internal topic.
    b. calls the connector’s taskConfigs(...) method with this connector configuration, and gets the connector’s latest configuration for each task.
    c. if the # of task configs has changed, or any of the task configurations have changed, then request the leader store the updated tasks configs for the connector.
  2. The leader will eventually:
    a. see/read the updated task configs
    b. “rebalance” the tasks by updating the workers’ assignments for the tasks, adding assignments for any new tasks (if the reconfig resulted in more tasks) and revoking any assignments for existing tasks that are no longer needed (if the reconfig resulted in fewer tasks configs).
  3. The various workers receive their updated assignments and:
    a. stop tasks that were revoked from that worker
    b. start tasks that are new to that worker
    b. stop and restart tasks that are were and still are assigned to that worker.

Technically, step 3 involves differs a bit depending upon which Connect protocol is used (e.g., eager, compatible or sessioned).

1 Like

Not that I wrote it, but yes, I’m working on DBZ-2975 where one of the planned improvements is being able to capture data from all CDC-enabled databases on SQL Server and monitor the list of such databases.

Yes, that’s the idea.

I believe this is what I was looking for.

Thank you, @riferrei!

Thank you for the detailed explanation, @rhauch!

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.