How to run Kafka Streams ProcesssorContext `schedule()` at fixed times

Hi :slight_smile:

I have an application that operates within 10s interval windows. Within a 10s window, the application should aggregate and transform the data before publishing the result at second 9. Since the aggregation and transformation is rather complex, I have chosen to utilize the Processor API.

Is is possible to configure, or implement logic, for the ProcessorContext schedule() to run at fixed times? For instance, like a cron job every ten seconds?

I have seen KAFKA-7699, but it looks to be stale?

I have previously gotten an answer from @bbejeck:

Not exactly. The best you could do with the schedule(interval, type, callback) method would be to estimate how often you’d like the callback to execute via the Duration interval parameter and choose PuctuationType.WALL_CLOCK_TIME . Note using WALL_CLOCK_TIME is best effort only “as its granularity is limited by how long an iteration of the processing loop takes to complete”

Is there a work around or “best-practice” to achieve a schedule run at fixed times? And are there any plans on making contributions to KAFKA-7699?

Kafka is community driven, and if anybody picks it up, we are happy to support. It did not come up very often, so it’s not a high priority task from our side atm (but your request just bumped the priority up of course). – While the tickets needs a small KIP, it seems to not to be too hard to implement, so if you want to pick it up yourself, happy to support you.

Otherwise there is not much you can do. One hack you could apply is, to get context.currentSystemTimeMs() when you register the punctuation for the very first time, and instead of using 10 sec duration, use the return value of current system time and only use the diff to the next “anchor” point. When the punctuation fires the first time, you register a new punctuation, now with 10 sec as desired, and cancel the first punctuation. It’s not really elegant and more a hack, but it might be a good enough workaround until K7699 is resolved.

1 Like

The hacky solution seems to be working! :slight_smile:

One shortcoming worth mentioning is that the first “anchorSchedule” needs a big enough time margin at startup to be allowed to fire and sync up to the anchor point. I’ll try to explain it with an example:

Assume we wish to publish records at the seventh second in each 10s interval (i.e. 09:00:07, 09:00:17, etc). If the application starts up at 09:00:05, then the anchorSchedule will be registered and set to trigger two seconds later at 09:00:07. Since schedule/punctuate calls are only best-effort, and there are no guarantees that they will trigger at exactly their specified time, I am experiencing that the anchorSchedule triggers out-of-sync due to, what seems to be, startup costs. A viable solution in my case was to set the anchorSchedule to sync-up on the next interval, i.e. 09:00:17. This means that we will miss the publication on 09:00:07, but it is OK in my case :slight_smile:

Another approach to the whole problem would be to set up a schedule triggering every second (or maybe even more frequent) constantly checking if the Kafka system clock was in the seventh second, and then run the publication logic if that is true.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.