If you are writing your own connector, then your
SourceConnector subclass can monitor for new, changed or removed source partitions (likely in a separate thread; see below). If it detects any changes in source partitions, the connector then can request a “reconfiguration” of the connector’s tasks. This is the only way a connector can signal that Connect should call the connector’s
taskConfigs(...) method again, stop all tasks, and restart the tasks as defined by the task configurations returned by the
Your connector requests a reconfiguration by simply calling the
requestTaskReconfiguration() method on the connector’s
ConnectorContext object. If you can rely upon running in Kafka 2.6.0 or later, you can get the context via the
SourceConnector.context() method; if you need to support your connector running in Kafka 2.5.x or earlier, then get it via the
context protected field.
One thing to keep in mind is that the
taskConfigs(...) method is called on the same
SourceConnector instance that called the
requestTaskReconfiguration() method, so any state held by the connector instance should be available when
taskConfigs(...) is called on the same instance. (It’s true that the connector could be stopped and restarted for a variety of reasons, and this might interrupt the reconfiguration process. But in that case you sort of don’t care because upon restart presumably your connector upon startup would discover the current set of source partitions and
taskConfigs(...) will be called before starting the tasks.)
Second, if your connector does start a thread to monitor the source system, make sure this thread is always stopped when the
Connector#stop() method is called. One way to do this is to use a single thread executor pool to run the logic, and in
Connector#stop() always call
Connector developers don’t need to know how Connect actually makes the above happen across the Connect worker cluster, but just in case you’re curious it basically does the following:
- The worker in which the connector instance is running:
a. loads the connector configuration from the internal topic.
b. calls the connector’s
taskConfigs(...) method with this connector configuration, and gets the connector’s latest configuration for each task.
c. if the # of task configs has changed, or any of the task configurations have changed, then request the leader store the updated tasks configs for the connector.
- The leader will eventually:
a. see/read the updated task configs
b. “rebalance” the tasks by updating the workers’ assignments for the tasks, adding assignments for any new tasks (if the reconfig resulted in more tasks) and revoking any assignments for existing tasks that are no longer needed (if the reconfig resulted in fewer tasks configs).
- The various workers receive their updated assignments and:
a. stop tasks that were revoked from that worker
b. start tasks that are new to that worker
b. stop and restart tasks that are were and still are assigned to that worker.
Technically, step 3 involves differs a bit depending upon which Connect protocol is used (e.g.,