TestTopologyDriver for left join

hi guys!
I am trying to test my topology which has two outputs.
first one is pretty simple, without any joins and it works fine.
Second one has leftJoin and the test fails.
Here is my topology example:

TOPOLOGY:
*KSTream<String, Student> studentsStream = *
getStudentsStream();

KTable<String, UniRoom> universityRoomsTable = roomsStream.
groupBy((k,v)->v.getId(), Grouped.with(Serdes.String(), avroRoomSerde)
reduce((aggValue, newValue) → newValue,
Materialized.<String, UniRoom, KeyValueStore<Bytes, byte[]>>as(“rooms-store” + System.currentTimeMillis())
.withKeySerde(Serdes.String())
.withValueSerde(avroRoomSerde));

the join itself

studentsStream.map((k,v)->v.getRoomId(), v)
.leftJoin(universityRoomsTable, mergeFunction, Joined.with(Serdes.String(), avroStudentSerde, avroRoomSerde)
. and so on…

so here is my test:

creating the input topics and pushing data into them:

var studentAvro = Student.builder.roomId(1).build();
var roomAvro = Room.builder.id(1).build();

studentsTopic.pipeInput(“1”, studentAvro);
roomsTopic.pipeInput(“1”, roomAvro);

and then into my mergeFunction which is method(Student, UniRoom) the right side of uniRoom is always null!
Why?
Whenever I start this code to DEV I have matches by the room id and i perform the merge function successfully in the join. Why with this TestTopologyDriver is failing?

studentsTopic.pipeInput(“1”, studentAvro);
roomsTopic.pipeInput(“1”, roomAvro);

Order matters. When you send the student record first, the rooms table is empty and thus it does not join. If you switch the order it should work.

When you execute in a real environment reading from a Kafka topic, the Kafka Streams runtime will pipe records in timestamp order, so if the table(room) record has a smaller ts, it would be processed first, and when the stream(student) record with larger ts comes in later, the join works, too.

Btw:

KeyValueStore<Bytes, byte[]>>as(“rooms-store” + System.currentTimeMillis())

Why do you add a timestamp to the store name? This is a little odd… A store name should be fixed.

here is the example of changed order and it doesn’t works again.
rooms.pipeInput(“1”, roomOne);
Thread.sleep(4000L);
students.pipeInput(“1”, studentFirst);

Whenever I receive an entry in students stream I go to my leftJoin function and the entry from KTable is null.

About the store name, i renamed it because i had problems when I start the topologyTest. Now when I removed it is fine.
I saw that pipeInput() method has params like Instant class.
I’ve also tried putting some TimeUnit but didnt worked.
Tried also to put the KTable message in @BeforeEach and again the same :slight_smile:

Okay, it did worked…
I was using roomsTopic input topic created by another driver used in a test related to rooms logic.
So what I did…
I have another driver calling the Students topology and I created the input topic by itself.
I caught this in debugging when I tried to get the store from the rooms driver.
I have two drivers, because I have two topologies and this is quite dangerous to be mistaken.
well, this was the solution - use only one driver per test and topics created only from itself…

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.