Kafka Streams behavior when processing batch exceeds max.poll.interval.ms

Hi, I would like to better understand what happens when the processing of an event (using Kafka Streams) takes longer than max.poll.interval.ms

I started doing some tests on a simplified scenario. The context is as follows:

  • I create one source topic and one destination topic; both have one single partition
  • the app is a simple Java class where I create a Kafka Streams instance, having the following config:
    • processing.guarantee = EXACTLY_ONCE_V2
    • num.stream.threads = 1
    • max.poll.interval.ms = 5000
    • max.poll.records = 10
  • the Kafka Streams consumes from the source topic, produces to the destination topic
  • when consuming, one specific event is designed to have high processing time (higher than max.poll.interval.ms)
  • I am running two instances of the same app (listed below) in parallel, to simulate 2 app nodes; it’s my understanding that this would behave similarly to having 2 Consumers that are part of the same Consumer Group, that will try to consume from the one available partition of the source topic; at any point in time, one should be processing, while the other should be idle.

What exactly happens if consuming an event E takes longer than max.poll.interval.ms? As far as I can understand from the docs, the moment the max.poll.interval.ms is exceeded, the internal Consumer created by the Kafka Streams instance is considered to have left the Group, then a rebalance is triggered.

Q1: When is the rebalancing taking place? Is it exactly when max.poll.interval.ms has been reached (Consumer has left the Group), or is it when the Consumer who left the Group calls the next poll?

Q2: (related to Q1) What happens during rebalancing?
a) Does Kafka reassigns all partitions (including the one where E was produced), even if the processing of E has not finished yet, or
b) Kafka reassigns all other partitions, then waits for event E to be consumed (or perhaps the entire batch that contains E), then re-allocates this partition to any of the other live Consumers in the Group?

With option a) the same event E will be processed in parallel by 2 Consumers, while with option b) we have the guarantee that no event will be concurrently processed by more than one Consumer.
I have tested the scenario both with a local Kafka server in Docker (bitnami/kafka:latest, version 3.6.1) and with Confluent, and I got different results. With local Kafka, we experienced option a), while with Confluent we experienced option b).

Q3: It appears that when processing takes longer than max.poll.interval.ms, the batch is rollbacked. Then the same batch will be fetched and processed again. Since the batch cannot be processed on time, it will always be rollbacked, then reprocessed indefinitely. I see this behavior in almost all my tests, except for one particular combination of max.poll.interval = 5s and processing_time_for_E = 15s. In this case, the batch seems to be rollbacked a couple of times, then at one point the Consumer just breaks out of the loop and continues processing the next batches normally. Which is the correct behavior?

Q4: I was expecting to see that Consumers receive batches of at most max.poll.records events. In our scenario, however, it looks as if the Consumer processes more than 10 events (the batch size as configured in the stream) before the rollback. It looks like (after the first rollback) the Consumer processes a random number of events (between 10 and 19), on every cycle of re-consume batch & rollback. I see there is an internal optimization in the poll method (while processing the received batch, Kafka seems to send the next fetch ahead of time). Is this normal behavior? Should we expect that the batch size is not exactly what we set as max.poll.records?

The app that we have tested with is listed below:

public class Stream {
    private static final Map<String, String> connectionProperties = Map.of(
        StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pkc-1wvvj.westeurope.azure.confluent.cloud:9092",
        StreamsConfig.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.toString(),
        SaslConfigs.SASL_MECHANISM, "PLAIN",
        SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username='XXX' password='XXX",
        "basic.auth.credentials.source", "USER_INFO"
    );
    private static final String source1 = "source1";
    private static final String destination1 = "destination1";
    private static final Map<String, Object> streamProperties = Map.of(
            StreamsConfig.APPLICATION_ID_CONFIG, "streaming-app-id",
            StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2.name.toLowerCase(),
            StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1,
			// change below to adjust timeout
            ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5_000,
            ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10
    );

    public static void main(String[] args) throws IOException {
        Serde<Integer> intSerde = Serdes.Integer();

        StreamsBuilder builder = new StreamsBuilder();

        builder.stream(source1, Consumed.with(intSerde, intSerde))
                .peek((key, value) -> {
                    System.out.println(Instant.now() + " Thread: " + Thread.currentThread().getName() + ". Consuming: " + value);
                    if (value == 10) {
                        try {
							// change below to adjust processing time
                            Thread.sleep(15_000);
                            System.out.println(Instant.now() + " Finished long processing");
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                })
                .filter((key, value) -> true)
                .to((key, value, record) -> destination1, Produced.with(intSerde, intSerde));

        Topology topology = builder.build();

        Properties props = new Properties();
        props.putAll(connectionProperties);
        props.putAll(streamProperties);

        try(KafkaStreams kafkaStreams = new KafkaStreams(topology, props)) {
            Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

            System.out.println("Start Streams App");
            kafkaStreams.start();
            System.in.read();
        }
    }
}

Q1: When is the rebalancing taking place? Is it exactly when max.poll.interval.ms has been reached (Consumer has left the Group), or is it when the Consumer who left the Group calls the next poll?

It will happen pretty much right after max.poll.interval.ms passed – consumers have a background thread that regularly sends heartbeats to the broker to indicate "liveness. If poll() is not called within max.poll.interval.ms, the heartbeat thread will detect this case and send a “leave group request” to the broker triggering a rebalance.

Q2: (related to Q1) What happens during rebalancing?
a) Does Kafka reassigns all partitions (including the one where E was produced), even if the processing of E has not finished yet, or

Yes, all partitions are re-assigned, to ensure that there is zero unassigned partitions (the consumer dropping out of the consumer group, is considered “dead” and thus does not own its previous partitions any longer).

b) Kafka reassigns all other partitions, then waits for event E to be consumed (or perhaps the entire batch that contains E), then re-allocates this partition to any of the other live Consumers in the Group?

There is no waiting. After the consumer dropped out of the group, it lost ownership and there is no reason to assume that the consumer might ever finish processing E (the whole JVM might have crashed – it’s unknown that it just takes longer than max.poll.interval.ms to finish processing – Kafka needs to assume the “worst case”, otherwise it might block forever, and the partitions could not be re-assigned at all).

With option a) the same event E will be processed in parallel by 2 Consumers, while with option b) we have the guarantee that no event will be concurrently processed by more than one Consumer.
I have tested the scenario both with a local Kafka server in Docker (bitnami/kafka:latest, version 3.6.1) and with Confluent, and I got different results. With local Kafka, we experienced option a), while with Confluent we experienced option b).

Not sure how you did the test setup, but the event could be processed by both consumers in parallel for both setups. Maybe it was just a race condition that prevented you to not see behavior (a) with Confluent Cloud; Confluent Cloud does not work any different compared to Apache Kafka.

Q3: It appears that when processing takes longer than max.poll.interval.ms , the batch is rollbacked. Then the same batch will be fetched and processed again. Since the batch cannot be processed on time, it will always be rollbacked, then reprocessed indefinitely. I see this behavior in almost all my tests, except for one particular combination of max.poll.interval = 5s and processing_time_for_E = 15s. In this case, the batch seems to be rollbacked a couple of times, then at one point the Consumer just breaks out of the loop and continues processing the next batches normally. Which is the correct behavior?

In general, that depends on commit.interval.ms config. When Kafka Streams gets a batch of messages, it would commit a “partial” batch after commit.interval.ms passed. Thus, after hitting max.poll.interval.ms on the “poison pill message”, it might not roll back the entire batch, but only to the last committer offset. – Given your example of max.poll.interval.ms = 5s and “processing time = 15s” it seems it should always get stuck exactly at E and should not be able to step over it – it’s unclear how it would have happened that the app made process beyond E.

Q4: I was expecting to see that Consumers receive batches of at most max.poll.records events. In our scenario, however, it looks as if the Consumer processes more than 10 events (the batch size as configured in the stream) before the rollback. It looks like (after the first rollback) the Consumer processes a random number of events (between 10 and 19), on every cycle of re-consume batch & rollback. I see there is an internal optimization in the poll method (while processing the received batch, Kafka seems to send the next fetch ahead of time). Is this normal behavior? Should we expect that the batch size is not exactly what we set as max.poll.records ?

The overall architecture is a little bit more complex. Kafka Streams also has an internal record buffer (that you can configure via buffered.records.per.partition; the config max.poll.records only applies to a single call to poll(), but Kafka Streams might call poll(), pull 10 records into it’s buffer, process 5, and call poll() again and put another 10 record into the buffer so the buffer size grows to 15. Now, Kafka Streams might process all 15 records before calling poll() again. (This talk might help to understand the architecture better: The Nuts and Bolts of Kafka Streams---An Architectural Deep Dive | Current 2023)

And yes, the consumer itself will also send the next fetch right away, and how much data is transferred over the network, is configured independently – it has nothing to do with max.poll.records (cf consumer configs containing fetch in their name). In the end, a single fetch request could fetch 100 records, and if your max.poll.records config is 20 for example, 5 calls to poll() would be served with a single network fetch to the brokers, ie, 1st poll() will trigger a fetch request, and calls No2 to No5 of poll() would get date from the consumer’s internal buffer (the 5th call would trigger the next fetch request because the buffer would be empty now, in the hope that more data is already received before the 6th call to poll() happen so latency is reduced – it’s a technique called “pre-fetching” and or “read ahead” and used in many systems).

Big thanks for the prompt and detailed response! I wrote my comments inline below.

All clear now :slight_smile:

I have played a bit more with the values of processing_time. The original tests were performed with a fix value of max.poll.interval.ms = 5s, and processing_time ranging from 15s to 30s. I redid the tests with processing time going up to 2 mins and experienced the behavior you described. I guess 30s was a bit less than how long rebalancing took in my Docker environment vs Confluent, where rebalancing was perhaps faster, and I could immediately see the same event processed by another Consumer.

I am able to consistently reproduce this behavior, both on Apache Kafka and Confluent. I have listed below the console output of the Consumer and the logs extracted from the Kafka Server (the ones from Docker, as I don’t have access to the ones in Confluent). The thing I notice is that for some reason there are some delete log and delete offset entries in the server log, after which the consumer continues past event E.

Start Streams App
2024-01-29T09:12:07.460374700Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 0
2024-01-29T09:12:07.478503700Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 1
2024-01-29T09:12:07.479996400Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 2
2024-01-29T09:12:07.479996400Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 3
2024-01-29T09:12:07.479996400Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 4
2024-01-29T09:12:07.479996400Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 5
2024-01-29T09:12:07.480497500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 6
2024-01-29T09:12:07.480497500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 7
2024-01-29T09:12:07.480497500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 8
2024-01-29T09:12:07.480497500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 9
2024-01-29T09:12:07.480998Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 10
2024-01-29T09:12:22.484061200Z Finished long processing
2024-01-29T09:12:22.484061200Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 11
2024-01-29T09:12:22.484061200Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 12
2024-01-29T09:12:22.485055600Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 13
2024-01-29T09:12:22.485055600Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 14
10:12:22.492 [kafka-producer-network-thread | streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-producer] ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl - stream-thread [streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1] stream-task [0_0] Error encountered sending record to topic destination1 for task 0_0 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
2024-01-29T09:12:25.702952400Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 0
2024-01-29T09:12:25.705627500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 1
2024-01-29T09:12:25.706140Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 2
2024-01-29T09:12:25.706140Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 3
2024-01-29T09:12:25.706140Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 4
2024-01-29T09:12:25.706673400Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 5
2024-01-29T09:12:25.706673400Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 6
2024-01-29T09:12:25.707210300Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 7
2024-01-29T09:12:25.707210300Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 8
2024-01-29T09:12:25.707727100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 9
2024-01-29T09:12:25.708248500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 10
2024-01-29T09:12:40.717072500Z Finished long processing
2024-01-29T09:12:40.717072500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 11
2024-01-29T09:12:40.717072500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 12
2024-01-29T09:12:40.717072500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 13
2024-01-29T09:12:40.718070Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 14
2024-01-29T09:12:40.718079Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 15
2024-01-29T09:12:40.718079Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 16
2024-01-29T09:12:44.007242200Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 17
2024-01-29T09:12:44.009902100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 18
2024-01-29T09:12:44.009902100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 19
2024-01-29T09:12:44.009902100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 20
2024-01-29T09:12:44.009902100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 21
2024-01-29T09:12:44.010418500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 22
2024-01-29T09:12:44.010418500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 23
2024-01-29T09:12:44.010418500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 24
2024-01-29T09:12:44.010418500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 25
2024-01-29T09:12:44.010418500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 26
2024-01-29T09:12:44.010934500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 27
2024-01-29T09:12:44.011451100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 28
2024-01-29T09:12:44.011451100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 29
2024-01-29T09:12:44.011451100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 30
2024-01-29T09:12:44.011451100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 31
2024-01-29T09:12:44.011451100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 32
2024-01-29T09:12:44.011451100Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 33
2024-01-29T09:12:44.011967500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 34
2024-01-29T09:12:44.011967500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 35
2024-01-29T09:12:44.011967500Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 36
2024-01-29T09:12:44.012786700Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 37
2024-01-29T09:12:44.012786700Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 38
2024-01-29T09:12:44.012786700Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 39
2024-01-29T09:12:44.012786700Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 40
2024-01-29T09:12:44.012786700Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 41
2024-01-29T09:12:44.013273300Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 42
2024-01-29T09:12:44.013273300Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 43
2024-01-29T09:12:44.013273300Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 44
2024-01-29T09:12:44.013273300Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 45
2024-01-29T09:12:44.013273300Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 46
2024-01-29T09:12:44.013772900Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 47
2024-01-29T09:12:44.013772900Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 48
2024-01-29T09:12:44.013772900Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 49
2024-01-29T09:12:44.014273Z Thread: streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1. Consumed: 50

Process finished with exit code 130
2024-01-29 10:11:55 [2024-01-29 09:11:55,005] INFO [QuorumController id=0] CreateTopics result(s): CreatableTopic(name='destination1', numPartitions=1, replicationFactor=-1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
2024-01-29 10:11:55 [2024-01-29 09:11:55,005] INFO [QuorumController id=0] Replayed TopicRecord for topic destination1 with topic ID Va93w6-4TWCBh-YTToo_pA. (org.apache.kafka.controller.ReplicationControlManager)
2024-01-29 10:11:55 [2024-01-29 09:11:55,005] INFO [QuorumController id=0] Replayed PartitionRecord for new partition destination1-0 with topic ID Va93w6-4TWCBh-YTToo_pA and PartitionRegistration(replicas=[0], isr=[0], removingReplicas=[], addingReplicas=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
2024-01-29 10:11:55 [2024-01-29 09:11:55,014] INFO [GroupCoordinator 0]: Removed 0 offsets associated with deleted partitions: destination1-0. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:11:55 [2024-01-29 09:11:55,014] INFO [DynamicConfigPublisher broker id=0] Updating topic destination1 with new configuration :  (kafka.server.metadata.DynamicConfigPublisher)
2024-01-29 10:11:55 [2024-01-29 09:11:55,033] INFO [Broker id=0] Transitioning 1 partition(s) to local leaders. (state.change.logger)
2024-01-29 10:11:55 [2024-01-29 09:11:55,033] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(destination1-0) (kafka.server.ReplicaFetcherManager)
2024-01-29 10:11:55 [2024-01-29 09:11:55,033] INFO [Broker id=0] Creating new partition destination1-0 with topic id Va93w6-4TWCBh-YTToo_pA. (state.change.logger)
2024-01-29 10:11:55 [2024-01-29 09:11:55,035] INFO [LogLoader partition=destination1-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
2024-01-29 10:11:55 [2024-01-29 09:11:55,036] INFO Created log for partition destination1-0 in /bitnami/kafka/data/destination1-0 with properties {} (kafka.log.LogManager)
2024-01-29 10:11:55 [2024-01-29 09:11:55,036] INFO [Partition destination1-0 broker=0] Log loaded for partition destination1-0 with initial high watermark 0 (kafka.cluster.Partition)
2024-01-29 10:11:55 [2024-01-29 09:11:55,036] INFO [Broker id=0] Leader destination1-0 with topic id Some(Va93w6-4TWCBh-YTToo_pA) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [0], adding replicas [] and removing replicas [] . Previous leader None and previous leader epoch was -1. (state.change.logger)
2024-01-29 10:12:04 [2024-01-29 09:12:04,160] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streaming-app-id2 in Empty state. Created a new member id streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-bd0a4210-b59f-4e39-aa0f-ff93dc2b0e77 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:04 [2024-01-29 09:12:04,163] INFO [GroupCoordinator 0]: Preparing to rebalance group streaming-app-id2 in state PreparingRebalance with old generation 0 (__consumer_offsets-46) (reason: Adding new member streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-bd0a4210-b59f-4e39-aa0f-ff93dc2b0e77 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:07 [2024-01-29 09:12:07,166] INFO [GroupCoordinator 0]: Stabilized group streaming-app-id2 generation 1 (__consumer_offsets-46) with 1 members (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:07 [2024-01-29 09:12:07,189] INFO [GroupCoordinator 0]: Assignment received from leader streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-bd0a4210-b59f-4e39-aa0f-ff93dc2b0e77 for group streaming-app-id2 for generation 1. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:07 [2024-01-29 09:12:07,431] INFO [TransactionCoordinator id=0] Initialized transactionalId streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-1 with producerId 14 and producer epoch 0 on partition __transaction_state-31 (kafka.coordinator.transaction.TransactionCoordinator)
2024-01-29 10:12:21 [2024-01-29 09:12:21,669] INFO [TransactionCoordinator id=0] Completed rollback of ongoing transaction for transactionalId streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-1 due to timeout (kafka.coordinator.transaction.TransactionCoordinator)
2024-01-29 10:12:22 [2024-01-29 09:12:22,522] INFO [GroupCoordinator 0]: Preparing to rebalance group streaming-app-id2 in state PreparingRebalance with old generation 1 (__consumer_offsets-46) (reason: Removing member streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-bd0a4210-b59f-4e39-aa0f-ff93dc2b0e77 on LeaveGroup; client reason: consumer poll timeout has expired.) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:22 [2024-01-29 09:12:22,522] INFO [GroupCoordinator 0]: Group streaming-app-id2 with generation 2 is now empty (__consumer_offsets-46) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:22 [2024-01-29 09:12:22,523] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-bd0a4210-b59f-4e39-aa0f-ff93dc2b0e77, groupInstanceId=None, clientId=streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer, clientHost=/172.21.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=5000, supportedProtocols=List(stream)) has left group streaming-app-id2 through explicit `LeaveGroup`; client reason: consumer poll timeout has expired. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:22 [2024-01-29 09:12:22,523] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streaming-app-id2 in Empty state. Created a new member id streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-77a3b34f-7033-41f6-ad40-717cefb2eaab and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:22 [2024-01-29 09:12:22,527] INFO [GroupCoordinator 0]: Preparing to rebalance group streaming-app-id2 in state PreparingRebalance with old generation 2 (__consumer_offsets-46) (reason: Adding new member streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-77a3b34f-7033-41f6-ad40-717cefb2eaab with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:25 [2024-01-29 09:12:25,530] INFO [GroupCoordinator 0]: Stabilized group streaming-app-id2 generation 3 (__consumer_offsets-46) with 1 members (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:25 [2024-01-29 09:12:25,535] INFO [GroupCoordinator 0]: Assignment received from leader streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-77a3b34f-7033-41f6-ad40-717cefb2eaab for group streaming-app-id2 for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:25 [2024-01-29 09:12:25,695] INFO [TransactionCoordinator id=0] Initialized transactionalId streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-1 with producerId 14 and producer epoch 2 on partition __transaction_state-31 (kafka.coordinator.transaction.TransactionCoordinator)
2024-01-29 10:12:40 [2024-01-29 09:12:40,858] INFO [GroupCoordinator 0]: Preparing to rebalance group streaming-app-id2 in state PreparingRebalance with old generation 3 (__consumer_offsets-46) (reason: Removing member streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-77a3b34f-7033-41f6-ad40-717cefb2eaab on LeaveGroup; client reason: consumer poll timeout has expired.) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:40 [2024-01-29 09:12:40,858] INFO [GroupCoordinator 0]: Group streaming-app-id2 with generation 4 is now empty (__consumer_offsets-46) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:40 [2024-01-29 09:12:40,859] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-77a3b34f-7033-41f6-ad40-717cefb2eaab, groupInstanceId=None, clientId=streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer, clientHost=/172.21.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=5000, supportedProtocols=List(stream)) has left group streaming-app-id2 through explicit `LeaveGroup`; client reason: consumer poll timeout has expired. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:40 [2024-01-29 09:12:40,860] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streaming-app-id2 in Empty state. Created a new member id streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-93e2bbfb-7faa-4029-b2e3-e5201d92a289 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:40 [2024-01-29 09:12:40,862] INFO [GroupCoordinator 0]: Preparing to rebalance group streaming-app-id2 in state PreparingRebalance with old generation 4 (__consumer_offsets-46) (reason: Adding new member streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-93e2bbfb-7faa-4029-b2e3-e5201d92a289 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:43 [2024-01-29 09:12:43,865] INFO [GroupCoordinator 0]: Stabilized group streaming-app-id2 generation 5 (__consumer_offsets-46) with 1 members (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:43 [2024-01-29 09:12:43,869] INFO [GroupCoordinator 0]: Assignment received from leader streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-93e2bbfb-7faa-4029-b2e3-e5201d92a289 for group streaming-app-id2 for generation 5. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:12:43 [2024-01-29 09:12:43,998] INFO [TransactionCoordinator id=0] Initialized transactionalId streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-1 with producerId 14 and producer epoch 3 on partition __transaction_state-31 (kafka.coordinator.transaction.TransactionCoordinator)
2024-01-29 10:12:54 [2024-01-29 09:12:54,916] INFO [LocalLog partition=source1-0, dir=/bitnami/kafka/data] Deleting segments as the log has been deleted: LogSegment(baseOffset=0, size=11096, lastModifiedTime=1706519148184, largestRecordTimestamp=Some(1706519148196)) (kafka.log.LocalLog)
2024-01-29 10:12:54 [2024-01-29 09:12:54,916] INFO [LocalLog partition=source1-0, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=0, size=11096, lastModifiedTime=1706519148184, largestRecordTimestamp=Some(1706519148196)) (kafka.log.LocalLog$)
2024-01-29 10:12:54 [2024-01-29 09:12:54,916] INFO Deleted log /bitnami/kafka/data/source1-0.20a2da5641434b4ca7565a36e087c7f6-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
2024-01-29 10:12:54 [2024-01-29 09:12:54,917] INFO Deleted offset index /bitnami/kafka/data/source1-0.20a2da5641434b4ca7565a36e087c7f6-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
2024-01-29 10:12:54 [2024-01-29 09:12:54,917] INFO Deleted time index /bitnami/kafka/data/source1-0.20a2da5641434b4ca7565a36e087c7f6-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
2024-01-29 10:12:54 [2024-01-29 09:12:54,917] INFO Deleted log for partition source1-0 in /bitnami/kafka/data/source1-0.20a2da5641434b4ca7565a36e087c7f6-delete. (kafka.log.LogManager)
2024-01-29 10:12:55 [2024-01-29 09:12:55,007] INFO [LocalLog partition=destination1-0, dir=/bitnami/kafka/data] Deleting segments as the log has been deleted: LogSegment(baseOffset=0, size=4570, lastModifiedTime=1706519147794, largestRecordTimestamp=Some(1706519147803)) (kafka.log.LocalLog)
2024-01-29 10:12:55 [2024-01-29 09:12:55,007] INFO [LocalLog partition=destination1-0, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=0, size=4570, lastModifiedTime=1706519147794, largestRecordTimestamp=Some(1706519147803)) (kafka.log.LocalLog$)
2024-01-29 10:12:55 [2024-01-29 09:12:55,008] INFO Deleted log /bitnami/kafka/data/destination1-0.5d1b1556818740f18e55aa0c57376d9b-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
2024-01-29 10:12:55 [2024-01-29 09:12:55,008] INFO Deleted offset index /bitnami/kafka/data/destination1-0.5d1b1556818740f18e55aa0c57376d9b-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
2024-01-29 10:12:55 [2024-01-29 09:12:55,008] INFO Deleted time index /bitnami/kafka/data/destination1-0.5d1b1556818740f18e55aa0c57376d9b-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
2024-01-29 10:12:55 [2024-01-29 09:12:55,008] INFO Deleted transaction index /bitnami/kafka/data/destination1-0.5d1b1556818740f18e55aa0c57376d9b-delete/00000000000000000000.txnindex.deleted. (kafka.log.LogSegment)
2024-01-29 10:12:55 [2024-01-29 09:12:55,008] INFO Deleted log for partition destination1-0 in /bitnami/kafka/data/destination1-0.5d1b1556818740f18e55aa0c57376d9b-delete. (kafka.log.LogManager)
2024-01-29 10:13:28 [2024-01-29 09:13:28,870] INFO [GroupCoordinator 0]: Member streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-93e2bbfb-7faa-4029-b2e3-e5201d92a289 in group streaming-app-id2 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:13:28 [2024-01-29 09:13:28,870] INFO [GroupCoordinator 0]: Preparing to rebalance group streaming-app-id2 in state PreparingRebalance with old generation 5 (__consumer_offsets-46) (reason: removing member streaming-app-id2-9caebd45-c99a-43f1-8e64-ab851b749748-StreamThread-1-consumer-93e2bbfb-7faa-4029-b2e3-e5201d92a289 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
2024-01-29 10:13:28 [2024-01-29 09:13:28,870] INFO [GroupCoordinator 0]: Group streaming-app-id2 with generation 6 is now empty (__consumer_offsets-46) (kafka.coordinator.group.GroupCoordinator)

Got it, thanks for the detailed explanation