Kafka Streams KTable Race Condition: Multiple Concurrent Updates See Same Stale State

I’m building a conference system using Kafka Streams where users can join/leave rooms.

I’m experiencing a race condition where multiple concurrent leave requests see the same stale room state, causing incorrect final results.

Expected behavior: Two users leave simultaneously → room becomes empty
Actual behavior: Two users leave simultaneously → one user remains in room

Setup

  • Topic partitions: All topics have 1 partition
  • Architecture: KTable for room state + KStream for leave requests

The Problem

When two users send leave requests at nearly the same time:

2025-06-23 17:08:10,997 User user1 left room testroom2. Users before: 2, after: 1
2025-06-23 17:08:10,997 User user2 left room testroom2. Users before: 2, after: 1

Both operations see “Users before: 2” which means they’re both reading the same stale KTable state.

Alice & Bob Example

What Should Happen:

  • Initial: Room has [Alice, Bob] (2 users)
  • Alice leaves: Room has [Bob] (1 user)
  • Bob leaves: Room has (0 users)

What Actually Happens:

  • Initial: Room has [Alice, Bob] (2 users)
  • Alice’s leave request: Sees room with [Alice, Bob] → creates room with [Bob]
  • Bob’s leave request: ALSO sees room with [Alice, Bob] → creates room with [Alice]
  • Final result: Room has [Alice] ← Wrong! Should be empty.

Original Code (Has Race Condition)

// Global table for conference room state
private GlobalKTable<ConferenceRoomKey, ConferenceRoom> conferenceRoomsTable;

// Create leave request stream
KStream<LeaveConferenceRequestKey, LeaveConferenceRequest> leaveRequestStream = 
    builder.stream(Topics.CONFERENCE_LEAVE_REQUESTS,
        Consumed.with(Red5ProKey.newSerdes(LeaveConferenceRequestKey.class), 
                     JsonSerdes.createSerde(LeaveConferenceRequest.class)));

// Process leave requests - THIS HAS THE RACE CONDITION
leaveRequestStream
    .selectKey((key, value) -> value.getRoomId())
    .flatMap((key, value) -> {
        List<KeyValue<Object, Object>> results = new ArrayList<>();
        
        // This calls getRoom() which reads from GlobalKTable
        // PROBLEM: Multiple concurrent calls see the same stale state
        LeaveConferenceResponse response = leaveRoom(value.getRoomId(), value.getStreamName());
        
        if (response != null) {
            ConferenceRoomStateKey roomStateKey = new ConferenceRoomStateKey(UUID.randomUUID().toString());
            results.add(KeyValue.pair(roomStateKey, response.getConferenceRoomState()));
            
            ConferenceRoomKey roomKey = new ConferenceRoomKey(value.getRoomId());
            ConferenceRoom room = response.getConferenceRoomState().getConferenceRoom();
            results.add(KeyValue.pair(roomKey, room));
        }
        return results;
    })
    // Branch to send to different topics
    .split()
    .branch((key, value) -> value instanceof ConferenceRoomState,
            Branched.withConsumer(stream -> stream
                .selectKey((k, v) -> (ConferenceRoomStateKey) k)
                .mapValues(v -> (ConferenceRoomState) v)
                .to(Topics.CONFERENCE_ROOM_STATES,
                        Produced.with(Red5ProKey.newSerdes(ConferenceRoomStateKey.class),
                                JsonSerdes.createSerde(ConferenceRoomState.class)))))
    .branch((key, value) -> value instanceof ConferenceRoom,
            Branched.withConsumer(stream -> stream
                .selectKey((k, v) -> (ConferenceRoomKey) k)
                .mapValues(v -> (ConferenceRoom) v)
                .to(Topics.CONFERENCE_ROOMS, 
                        Produced.with(Red5ProKey.newSerdes(ConferenceRoomKey.class),
                                JsonSerdes.createSerde(ConferenceRoom.class)))));

The problematic leaveRoom() method:

public LeaveConferenceResponse leaveRoom(String roomId, String streamName) {
    ConferenceRoomKey roomKey = new ConferenceRoomKey(roomId);

    // RACE CONDITION: This reads from GlobalKTable store
    // Multiple concurrent calls can see the same state before updates propagate
    ConferenceRoom room = getConferenceRoomStore().get(roomKey);
    if (room == null) {
        log.warn("Conference room id: {} not found.", roomId);
        return null;
    }

    ConferenceUser user = room.getUser(streamName);
    if (user == null) {
        log.warn("User with ID {} not found in room {}", streamName, roomId);
        return new LeaveConferenceResponse(new ConferenceRoomState(room));
    }

    // PROBLEM: This mutates the room object directly
    // If two calls happen simultaneously, both see the same initial state
    room.removeUser(streamName);

    ConferenceRoomState conferenceRoomState = new ConferenceRoomState(room);
    log.info("User {} left room {}", streamName, roomId);
    return new LeaveConferenceResponse(conferenceRoomState);
}

What I’ve Tried

1. Switched to KTable + leftJoin

private KTable<ConferenceRoomKey, ConferenceRoom> conferenceRoomsTable;

leaveRequestStream
    .selectKey((key, value) -> new ConferenceRoomKey(value.getRoomId()))
    .leftJoin(
        conferenceRoomsTable,
        (leaveRequest, currentRoom) -> {
            if (currentRoom == null || !currentRoom.getUsers().containsKey(leaveRequest.getStreamName())) {
                return currentRoom;
            }
            
            // Create new room instance (immutable update)
            ConferenceRoom updatedRoom = new ConferenceRoom(currentRoom.getRoomId());
            for (Map.Entry<String, ConferenceUser> entry : currentRoom.getUsers().entrySet()) {
                updatedRoom.addUser(entry.getValue());
            }
            updatedRoom.removeUser(leaveRequest.getStreamName());
            return updatedRoom;
        }
    )
    .filter((key, room) -> room != null)
    .to(Topics.CONFERENCE_ROOMS, ...);

Result: Still has race condition.

2. Added MAX_TASK_IDLE_MS Configuration

@Override
protected Properties streamsConfig(...) {
    Properties props = super.streamsConfig(...);
    props.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 1000L);
    return props;
}

Result: This slightly decreased the occurrence rate, but problem still existed and felt like a workaround. Not even sure if it decreased tbh.

I understand there is a concurrency issue. I expect the second leave request to be processed after the first leave request is completed, but it is not the case.

Any insights on why these standard Kafka Streams patterns aren’t preventing the race condition would be greatly appreciated! I am new to kafka and pretty confused.

Crosse posted on SO: https://stackoverflow.com/questions/79676712/kafka-streams-ktable-race-condition-multiple-concurrent-updates-see-same-stale

I did reply there.