I have a topic that contains a startDate and an endDate attribute. When building the stream I would like to explode the single row into multiple rows, one for each day between start and end dates. For didactic reasons, I’m converting timestamps into string dates in the examples below.
If someone else is looking for this, the Java program below was created following these steps:
package com.example;
import io.confluent.ksql.function.udtf.Udtf;
import io.confluent.ksql.function.udtf.UdtfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
functions
@UdtfDescription(name = "date_explode",
author = "Rafael",
version = "1.0.0",
description = "Given a start and end date, explode the period into a list of dates.")
public class DateExplodeUdtf {
@Udtf(description = "start and end date are days from epoch")
public List<Timestamp> dateExplode(
@UdfParameter(value = "startDate", description = "start date (days from epoch)") final Integer startDate,
@UdfParameter(value = "endDate", description = "end date (days from epoch)") final Integer endDate
) {
List<Timestamp> result = new ArrayList<>();
LocalDate startDateComputed = LocalDate.ofEpochDay(startDate);
LocalDate endDateComputed = LocalDate.ofEpochDay(endDate);
for (LocalDate date = startDateComputed; !date.isAfter(endDateComputed); date = date.plusDays(1)) {
result.add(Timestamp.valueOf(date.atStartOfDay()));
}
return result;
}
}
And when executing:
SELECT
id
,DATE_EXPLODE(startDate, endDate) AS date
FROM mystream;