Kafka Streams FK-join: force result timestamp to always come from main (left) flow

Hi, we use Kafka Streams 4.1.1 and need guidance on FK-join timestamp behaviour.

Concrete example (generalised):

  • Main flow record (authoritative entity): timestamp T_main=1000
  • Enrichment flow record (auxiliary update): timestamp T_enrich=2000
  • Later, another main flow update arrives with T_main=1500
  • We then see an out-of-order warning pattern on the FK internal response path: oldTs=2000 -> newTs=1500

Our model:

  • The main flow is the source of truth for business time and should retain the original event time.
  • The enrichment flow is technical/auxiliary; its timestamp should not override business time.
  • One enrichment update can fan out to many main entities.

Goal:

  • On each FK join result, always use the timestamp from the main (left) flow.
  • Prevent enrichment-side timestamps from advancing the effective timestamp of the main entity state.

Questions:

  1. Is there any built-in option in KTable.leftJoin(... foreignKeyExtractor ...) to force the output timestamp to come from the left/main side?
  2. If not, what is the recommended approach:
    • normalise enrichment timestamps before the FK join, or
    • reset the timestamp after the join via the Processor API (withTimestamp(...))?
  3. Is this expected FK internal behaviour (...subscription-response-topic) or related to known issues around FK join ordering?

Topology shape:

  • Main KTable leftJoin enrichment KTable via FK extractor.

Any proven pattern for an “authoritative main-side timestamp” in FK joins would be appreciated.

What you observe is by-design. The join computes the result record timestamp as max[left.ts, right.ts] what in general is semantically correct (the join has built-in event-time semantics). The result, semantically, cannot exist before both left and right record exist.

In your example, if left.ts = 1000 and right.ts=1500 at ts=1000 there is no join result, because right does not exist yet. The join can only happen at 1500 when both record exist. Does this make sense?

Given your business requirements, it seems you either don’t want proper event-time semantics, or your right input data is not timestamped correctly.

I guess you have a few options. The simples one might be, to modify the ts of the right input (maybe even set them to zero; depends on you business requirement), before the data got into the table:

KTable rightJoinInput = builder.stream(“rightTableTopic”).processValues(….).toTable();

The processValues() steps allow you to modify the timestamps. (In older versions it might be transformValues() instead of processValues(…).)

If this does not work, you would so some pre-processing of you left input:

  • move the ts into the value before the join (processValues() can help again).
  • remove the ts field from the value and set the proper record-ts after the join (again processValues() is your friend

Thanks, this clarifies the semantic intent.

However, after tracing our case through the Kafka Streams (v4.1.1) FK-join internals, I think there is an important distinction between the semantic “join result” and the internal …subscription-response-topic.

In our case we force the right-side table record timestamp to 0, and then we observe records with ts=0 on the internal FK topic, for example:

offer-structure-write-v4x0-offer-category-store-cat-sport-order-subscription-response-topic

What we found in the Kafka Streams sources is:

  • SubscriptionJoinProcessorSupplier computes
    resultTimestamp = max(left.ts, right.ts)

  • but ForeignTableJoinProcessorSupplier (the path triggered by a right-table update) forwards with:
    record.withKey(…).withValue(…)
    and does not recompute the timestamp

  • then ResponseJoinProcessorSupplier also forwards with:
    record.withValue(result)
    so it preserves the incoming response-record timestamp

So it seems that for FK joins there are two paths:

  1. left/subscription-triggered path:
    timestamp is computed as max(left.ts, right.ts)

  2. right-triggered path:
    the internal subscription-response-topic keeps the right-record timestamp

This would explain why, if we force right.ts = 0, the internal …subscription-response-topic also gets ts=0.

So my question is: is this understanding correct?
In other words, does the max(left.ts, right.ts) rule apply to the left-triggered FK path, while right-triggered FK responses preserve the right timestamp in the internal response topic?

Best,
Stas

What you say is correct, but that’s just an implementation detail you don’t need to worry about. What’s in the response topic is internal by definition, so not sure why you care about it?

In other words, does the max(left.ts, right.ts) rule apply to the left-triggered FK path, while right-triggered FK responses preserve the right timestamp in the internal response topic?

It applies to both, but it’s only computed on the left hand side.

  1. When a left hand side update happens, the FK join first send a subscription request to the right-hand side. The right hand side would process the subscription update to send a subscription response. Afterwards the left side would process the response to compute the join result. – Of course, the response must encode to right hand side’s record ts, to allow the left hand side to compute max(l.ts, r.ts).
  2. When a right hand side update happens, the FK join checks existing subscription and also sends a response to the left hand side to trigger a join result computation. Again, left hand side would pickup the response to compute the join result; including max(l.ts, r.ts) As a matter of fact, the left hand side does not even know, or care, if a response is triggered by a left or right hand side update, and it always include the ts computation, which is part of the join result computation.

Note that the response topic does not contain join results. It only contains the right hand side record which must be joined, and the left side picks-up the right hand side record to compute the join result.

Does this help?