I have a need to have multiple topic inputs read in order based on the default message landing time. Ive read the flow control with timestamps documentation. I’ve found how to force the DSL to have a single subtopology for multiple input topics is to use a “fake” transform for each input and pass a “fake” state store to all the transforms to have them share it.
Question 1. Is there an easier way to hack the DSL to force all inputs to use a single subtopology in order to achieve message processing ordering across all inputs based on timestamp? #2 How can I achieve this same effect with the PAPI?
It’s fairly straight forward as far as kstreams configs go. We’d like for all the messages to be consumed in timestamp order without having to write the code to synchronize between the topics.
@Configuration
@EnableConfigurationProperties({AppProperties.class})
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
private final AppProperties props;
@Autowired
public KafkaStreamsConfig(AppProperties props) {
this.props = props;
}
@Bean
public KafkaStreamsConfig setup(StreamsBuilder builder) {
log.info("Initializing KafkaStreamsConfig...");
KStream<String, EmployeeCalled> employeeCalledStream = builder.stream(props.getEmployeeCalledInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(EmployeeCalled::new)));
employeeCalledStream
.mapValues(new EmployeeCalledMapper())
.to(props.getEmployeeStatusOutputTopic(), Produced.with(Serdes.String(), new ProtoSerde<>(EmployeeStatus::new)));
KStream<String, EmployeeCallReleased> employeeCallReleasedStream = builder.stream(props.getEmployeeCallReleasedInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(EmployeeCallReleased::new)));
employeeCallReleasedStream
.mapValues(new EmployeeCallReleasedMapper())
.to(props.getEmployeeStatusOutputTopic(), Produced.with(Serdes.String(), new ProtoSerde<>(EmployeeStatus::new)));
KStream<String, EmployeeRested> employeeRestedStream = builder.stream(props.getEmployeeRestedInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(EmployeeRested::new)));
employeeRestedStream
.mapValues(new EmployeeRestedMapper())
.to(props.getEmployeeStatusOutputTopic(), Produced.with(Serdes.String(), new ProtoSerde<>(EmployeeStatus::new)));
KStream<String, EmployeeTieUp> employeeTieUpStream = builder.stream(props.getEmployeeTieUpInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(EmployeeTieUp::new)));
employeeTieUpStream
.mapValues(new EmployeeTieUpMapper())
.to(props.getEmployeeStatusOutputTopic(), Produced.with(Serdes.String(), new ProtoSerde<>(EmployeeStatus::new)));
KStream<String, EmployeeOnDuty> employeeOnDutyStream = builder.stream(props.getEmployeeOnDutyInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(EmployeeOnDuty::new)));
employeeOnDutyStream
.mapValues(new EmployeeOnDutyMapper())
.to(props.getEmployeeStatusOutputTopic(), Produced.with(Serdes.String(), new ProtoSerde<>(EmployeeStatus::new)));
KStream<String, EmployeeCallBusted> employeeCallBustedStream = builder.stream(props.getEmployeeCallBustedInputTopic(), Consumed.with(Serdes.String(), new ProtoSerde<>(EmployeeCallBusted::new)));
employeeCallBustedStream
.mapValues(new EmployeeCallBustedMapper())
.to(props.getEmployeeStatusOutputTopic(), Produced.with(Serdes.String(), new ProtoSerde<>(EmployeeStatus::new)));
log.debug("Initialized {}", builder.build().describe());
return this;
}
}
The only thing I can think of would be, to use a single builder.stream(...) passing in a list of topic names, and add a process() step which branches the records based in topic name (you would need to use process() instead of split() to be able to access the topic name).
If you name the downstream .mapValues(..., Names.as(...)) it should be simple to split the input via context.forward(..., <childName>).
Note though: timestamp synchronization would still only happen on a “per task” basis. Thus, if your input topic has multiple partitions, time would be synchronized within a task only (and thus for all partitions with the same “index/number”), but not between tasks. – Because different tasks might be executed by different threads and/or instances, time synchronizing between tasks is not supported.