How to explode rows for each day between 2 timestamp columns?

I have a topic that contains a startDate and an endDate attribute. When building the stream I would like to explode the single row into multiple rows, one for each day between start and end dates. For didactic reasons, I’m converting timestamps into string dates in the examples below.

Input data:

{ "id": 1, "startDate": "2023-03-01", "endDate": "2023-03-05" }
{ "id": 2, "startDate": "2023-03-02", "endDate": "2023-03-06" }

Some initial stream loading from the topic:

CREATE STREAM mystream (
   id INT
  ,startDate TIMESTAMP
  ,endDate TIMESTAMP
>) WITH (
  kafka_topic='mytopic',
  value_format='json',
  partitions=1
);

The desired streaming result after transformations and explosions:

{ "id": 1, "date": "2023-03-01" }
{ "id": 1, "date": "2023-03-02" }
{ "id": 1, "date": "2023-03-03" }
{ "id": 1, "date": "2023-03-04" }
{ "id": 1, "date": "2023-03-05" }
{ "id": 2, "date": "2023-03-02" }
{ "id": 2, "date": "2023-03-03" }
{ "id": 2, "date": "2023-03-04" }
{ "id": 2, "date": "2023-03-05" }
{ "id": 2, "date": "2023-03-06" }

Is it possible to do something like this with ksqlDB?

1 Like

EXPLODE work on an input ARRAY so you cannot use it for your use-case. You could write a user-defined table function though: User-defined functions (UDFs) - ksqlDB Documentation

Thank you! That worked for me. :slightly_smiling_face:

If someone else is looking for this, the Java program below was created following these steps:

package com.example;

import io.confluent.ksql.function.udtf.Udtf;
import io.confluent.ksql.function.udtf.UdtfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;

functions
@UdtfDescription(name = "date_explode",
         author = "Rafael",
         version = "1.0.0",
         description = "Given a start and end date, explode the period into a list of dates.")
public class DateExplodeUdtf {

  @Udtf(description = "start and end date are days from epoch")
  public List<Timestamp> dateExplode(
    @UdfParameter(value = "startDate", description = "start date (days from epoch)") final Integer startDate,
    @UdfParameter(value = "endDate", description = "end date (days from epoch)") final Integer endDate
  ) {
    List<Timestamp> result = new ArrayList<>();
    LocalDate startDateComputed = LocalDate.ofEpochDay(startDate);
    LocalDate endDateComputed = LocalDate.ofEpochDay(endDate);
    for (LocalDate date = startDateComputed; !date.isAfter(endDateComputed); date = date.plusDays(1)) {
      result.add(Timestamp.valueOf(date.atStartOfDay()));
    }
    return result;
  }
}

And when executing:

SELECT
   id
  ,DATE_EXPLODE(startDate, endDate) AS date
FROM mystream;
3 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.