Is is possible to use a custom timestamp extractor for windows?

Hello,
we are running confluentinc/cp-ksqldb-server:7.6.0 and are facing a following problem with regard to daily windowing due to UTC timestamps.

We are running aggregations on data that is essentially similar to the following:

{
  "id": "<uuid>"
  "timestamp": "2024-02-27:02:56:00.000Z",
  "quantity": 3.25
}

I want to create a table that groups data with the same id using windowing, as follows:

CREATE OR REPLACE TABLE aggregated WITH (KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO') 
AS SELECT
  ID, AS_VALUE(ID) AS "dataId",
  SUM(QUANTITY) AS "sumQuantity",
  AS_VALUE(windowstart) as "day"
FROM INPUT_DATA
WINDOW TUMBLING ( SIZE 1 DAY, GRACE PERIOD 72 HOURS )
GROUP BY ID;

However, we want to aggregate not by calendar days but by business days, which for us are defined as the interval from 04:00:00.000Z, inclusive, until 03:59:59.999Z, exclusive.

The problem is essentially what is described in the docs:

ksqlDB is based on the Unix epoch time in the UTC timezone, and this can affect time windows. For example, if you define a 24-hour tumbling time window, it will be in the UTC timezone, which may not be appropriate if you want to have daily windows in your timezone.

Is it possible to use a custom timestamp extractor so that WINDOW uses my definition of intervals for a day (i.e. [04:00:00.000Z, 03:59:59.999) instead of the default [00:00:00.000Z, 23:59:59.999Z)?

If not, is there a recommended way to deal with this issue?

One way that I can think of would be to have a separate table of “days” that I join my stream on based on which day interval the timestamp of the data lands on, but seeing how this table and the stream would (by design) have different time semantics, it feels like the wrong approach.

Another way would be to simply offset all of my data by 4 hours. If nothing else, this approach should work but requires either:

  • adding a custom timestamp field containing the offset timstamp, or
  • a copy of the data, or
  • manipulating the raw timestamp

The first of which seems like the least offensive and invasive approach. It would be ideal, though, if this logic were encapsulated within the business-case depicted by the specific ksql query.

No, there isn’t a way to plug in an offset like this afaik. There’s a longstanding ksql issue on this topic.

The idea of offsetting by 4 hours should work though as you say it’s a bit offensive :slight_smile:

One question: how you want to handle Daylight Saving? Do you want to change the business day boundaries based on DST, or is it always an offset by 4 hours?

Another idea is to do the aggregation without windowing, i.e., without WINDOW. For example, sum and group by CAST(PARSE_TIMESTAMP(...) AS DATE). Something like:

SELECT
  ID, AS_VALUE(ID) AS "dataId",
  SUM(QUANTITY) AS "sumQuantity",
  CAST(PARSE_TIMESTAMP(timestamp, 'yyyy-MM-dd:HH:mm:ss:SSS', 'America/New_York') AS DATE) as "dt"
FROM INPUT_DATA
GROUP BY ID, CAST(PARSE_TIMESTAMP(timestamp, 'yyyy-MM-dd:HH:mm:ss:SSS', 'America/New_York') AS DATE);

A few notes about this:

  • unfortunately you need to group by the function; group by alias doesn’t work
  • this would figure daylight saving into the day boundaries; I think that using UTC and TIMESTAMPADD or TIMESTAMPSUB would work if you don’t want DST to figure in
  • There are limitations in this approach compared to windowing. E.g., you can’t use EMIT FINAL or grace periods. There are ways to solve for this, e.g., select the last row per ID / date in a subsequent query, but it does add complexity. The more I think about it, I lean toward the time shift workaround that you suggested so that I could use windowing…

Thanks a lot for the reply. It’s at least a bit comforting to know that I haven’t simply overlooked something trivial. :slight_smile:

Great question. I think the easiest approach is to just offset it by 4 hours regardless of DST.
The source data is always in UTC and I am planning to keep it that way throughout all queries. Since the offset is only needed to sort the data into the proper window, and ksqlDB uses UTC, I think this should suffice.

Come to think of it, what happens if the data has a dynamic/variable offset depending on key? Since the window applies for a given key, this shouldn’t be an issue, correct?

I suppose I’d potentially run into big problems with the grace period if the offset were to change between individual records with the same key, so that probably should not be allowed to happen…

Interesting approach, and it would probably be enough for simpler use cases like a downstream anomaly detection (I might even use it for that :slight_smile: ). Unfortunately, at the dimensionality of the grouping keys we’re expecting (id is just one of many grouping keys, the example was very simplified), I fear that a non-windowed approach wouldn’t be ideal for dealing with the data. Similarly, the business case requires some kind of grace period (e.g. offline clients that buffer data), so I’m out of luck there too.

The idea is to adjust into a target time zone at the moment of querying, hence the need for a granular approach (i.e. hour-based) and UTC-based timestamps for aggregations with a higher granularity.

Indeed. It does add synthetic fields that bloat my data, but I feel like it will at least be a relatively maintainable approach. Hopefully, nothing too scary pops up during testing… time zones can be tricky.

Maybe I can convince management to hold off on this until time zones are abolished… :slight_smile:

:rofl: You might be waiting a while

Yeah I don’t see a way to get dynamic offsets with windowing unless (a) the dynamic aspect of it is done upstream with the synthetic shifted timestamp, i.e., the shift can be done at ingest time and not query time, and (b) the grace period accounts for the differences that can crop up. I’m not sure how hard of a requirement the dynamic / variable offsets are for you (what’s the use case where this comes into play? or are you saying w.r.t DST?) and whether conditions (a) and (b) are doable in your case though. If they are doable / applicable then I don’t think you necessarily need to avoid it.

The dynamic offsets in this case would have happened upstream upon ingest.

The business case is the following:

  • Booking data from a point-of-sale system is being ingested into the system, one booking at a time.
  • This booking data is aggregated on an hourly basis which is then loaded into a table using a CTAS statement.

The chain (in this case, including the offset) looks something like this (simplified a bit):

  • A booking contains a bookingEndedAt which is the UTC timestamp of the booking, and a site_id which is represents the physical location the booking took place at.
  • A booking contains 1..n line items.
  • Each line item contains a plu, a line_total, and a quantity.
  • These line items are aggregated based on site_id and plu.
CREATE OR REPLACE STREAM BOOKING_STREAM WITH (KAFKA_TOPIC='bookings', TIMESTAMP='bookingEndedAt', VALUE_FORMAT='AVRO');

CREATE OR REPLACE STREAM LINE_ITEMS_EXPLODED WITH (TIMESTAMP='booking_ts') AS SELECT
  SITEID AS SITE_ID,
  BOOKINGENDEDAT AS BOOKING_TS,
  OFFSETBOOKINGENDEDAT AS OFFSET_BOOKING_TS,
  EXPLODE(LINEITEMS) AS LINE_ITEM
FROM BOOKING_STREAM
PARTITION BY SITEID;

CREATE OR REPLACE STREAM LINE_ITEMS WITH (TIMESTAMP='booking_ts') AS SELECT
  SITE_ID,
  BOOKING_TS,
  OFFSET_BOOKING_TS,
  LINE_ITEM->PLU,
  LINE_ITEM->LINETOTAL AS LINE_TOTAL,
  LINE_ITEM->QUANTITY AS QUANTITY,
FROM LINE_ITEMS_EXPLODED
WHERE LINE_ITEM->TYPE IN ('BOOKING', 'BOOKING_VOID');

CREATE OR REPLACE TABLE HOURLY_BOOKING_ITEM_REVENUE WITH (KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=17) AS SELECT
  SITE_ID, AS_VALUE(SITE_ID) AS "siteId",
  PLU, AS_VALUE(PLU) AS "plu",
  SUM(LINE_TOTAL) AS "lineTotal",
  SUM(QUANTITY) AS "sumQuantity",
  AS_VALUE(windowstart) as "hour"
FROM LINE_ITEMS
WINDOW TUMBLING ( SIZE 1 HOURS, GRACE PERIOD 72 HOURS )
GROUP BY SITE_ID, PLU;

However, I’ve noticed that my approach will not work as expected. This is due to the fact that, for an hourly aggregation, I still need to use the correct timestamp - which is the timestamp of the booking. In addition, I cannot use another timestamp for windowing when creating the table other than the one that the source stream uses. Essentially, I need a OFFSET_LINE_ITEMS stream which duplicates the data in LINE_ITEMS.

This unfortunately seems to be a blocker for this approach as this would be a lot of duplicated storage. Since the definition of a “business day” is a hard requirement for this business case, I believe there’s not really an option besides just doing another layer of aggregation during query time.

This topic was automatically closed after 30 days. New replies are no longer allowed.