Kafka streams test case for left join

Hi folks,

I am trying to write unit test using TopologyTestDriver, I have use case like below involves left join , when record is published in left side of stream there is not output, only when I publish on right side as well, record is coming in output.

import com.twitter.finatra.kafkastreams.utils.ScalaStreamsImplicits
import org.apache.kafka.streams.kstream.JoinWindows
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.kstream.KStream
import org.apache.kafka.streams.scala.serialization.Serdes
import org.apache.kafka.streams.scala.serialization.Serdes.stringSerde
import org.apache.kafka.streams.{StreamsBuilder, TopologyTestDriver}
import org.scalatest.wordspec.AnyWordSpecLike

import java.time.Duration

class TestTopologyDriver extends AnyWordSpecLike with ScalaStreamsImplicits {
  "Test driver" must {
    "Happy scenario" in {
      val builder = new StreamsBuilder();
      val inputTopic1 = "inputTopic1"
      val inputTopic2 = "inputTopic2"
      val outputTopic = "outputTopic"

      val stream1: KStream[String, String] = builder.asScala.stream[String, String](inputTopic1)
      val stream2: KStream[String, String] = builder.asScala.stream[String, String](inputTopic2)
      stream1.
        leftJoin(stream2)((a, b) =>
          a + b,JoinWindows
          .ofTimeDifferenceWithNoGrace(Duration.ofSeconds(1))
        )
        .to(outputTopic);
      val topology = builder.build();
      val testDriver = new TopologyTestDriver(topology)
      val stringSerializer = Serdes.stringSerde.serializer()
      val stringDeserializer = Serdes.stringSerde.deserializer()

      val input1Stream = testDriver.createInputTopic(inputTopic1, stringSerializer, stringSerializer)
      val input2Stream = testDriver.createInputTopic(inputTopic2, stringSerializer, stringSerializer)
      val outputStream = testDriver.createOutputTopic(outputTopic, stringDeserializer, stringDeserializer)

      input1Stream.pipeInput("1", "Coming from left side")
      println(outputStream.readRecord())
    }
  }
}

If you push a record in the left input, it will be put into the join window. As long as it does not join, no output will be emitted. Only after a record is pushed to the right hand side, it might join (if both records falls into the same join window)-- If the left record does not join at all, and thus would result in a <key, <left,null>> result, this “left result” is only emitted when the join window closes.

The join window only closes when stream-time advances, and thus only when new records (with larger timestamp) arrive. Thus, for unit testing, you might need to pipe some final dummy record through the topology, to advance stream-time and thus “flush out” all pending records when windows close.

Btw: there is also some wall-clock time “throttling” mechanism built-it (for perf reasons), on when left-result record are actually emitted. Thus, you might also need to advance wall-clock time during your test, to ensure flushing really happens (or you disable it by setting the internal “flush interval config” to zero. It’s called __emit.interval.ms.kstreams.outer.join.spurious.results.fix__

For more details on joins, check out Temporal-Joins in Kafka Streams and ksqlDB | Matthias Sax, Confluent

1 Like

Hey thanks @mjsax for detailed reply , the case you have mentioned is working correct .
Below is example for window of 3 seconds

     val to=Instant.now()
      input1Stream.pipeInput("1","Coming from left",to)
      input1Stream.pipeInput("2", "Coming from left",to.plusMillis(3001))
      input2Stream.pipeInput("3", "Coming from right",to.plusMillis(3001))

but I have query here is it the actual behavior of streams or the test framework ? in scenarios like during the active window there is no join happened from right side or no new record comes in left side and window time expires still there is no output is emitted, like below which is failing , not sure if is it the correct way I am testing.

 val to = Instant.now()
      input1Stream.pipeInput("1", "Coming from left", to)
      Thread.sleep(4000)
      println(outputStream.readRecord())

if this is actual behavior that when no record comes in right side and till new record comes in different time window and stream time updates, last active window left buffer records do not get emitted ? that means streams output will always be short of last active window left buffer records ? is my interpretation right here ?

Yes, in your second example, “stream-time” (it’s really a logical clock, that is computed as max(r1.ts, t2.ts,...) and so forth) will be stuck at to – Calling thread.sleep will advance wall-clock time, but not the logical stream-time clock which is only advanced base in input date timestamp, and thus, if there is no input data, stream-time stops to advance and thus the window in not closed.

Btw: it does not matter if record are coming the left or right input. Both would advance stream-time.

Kafka Streams assume that input never stops, because it’s built for stream processing. For busty streams, this is sometimes an issue. We hope to address it at some point to make it better, but for now, yes, data might be stuck if input stream does not provide new records. It’s not limited to the test framework.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.