Consuming messages in .NET client issues

Hello all,

I’m writing a Windows Service to consume certain messages from a Kafka server and I have a problem with it.
I can consume messages (my sample code is below), and it works for about 2-3 hours.
After that time I get errors that the partition is revoked. So I tried using a fixed time in the Consume method (with value 300000) in stead of a cancellation token and then restart after the Consume stops on it’s own after that timer.
I get the “Revoked”, and “Assigned” partition messages after this, but my Consume somehow doesn’t pick up new messages anymore after all this.
I have no clue what i’m doing wrong. Can anyone give suggestions on how to set this up?

Sidenote : I have to consume 5 topics, so I’m actually running each consumer on a thread as you can see in the following code.

Marc

   var consumerConfig = new ConsumerConfig
            {
                BootstrapServers = ConfigurationManager.AppSettings["KafkaServer"],
                SecurityProtocol = SecurityProtocol.SaslSsl,
                SslCaLocation = ConfigurationManager.AppSettings["KafkaPEMLocation"],
                SaslMechanism = SaslMechanism.Plain,
                GroupId = "TestInterface",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                SaslPassword = ConfigurationManager.AppSettings["KafkaPwd"],
                SaslUsername = ConfigurationManager.AppSettings["KafkaUser"]
            };

        var consumerWO = new ConsumerBuilder<Ignore, string>(consumerConfig)
            .SetErrorHandler((_, e) =>
            {
                Console.WriteLine($"Error: {e.Reason}");
                return;
            })
            .SetPartitionsAssignedHandler((c, partitions) =>
            {
                Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
            })
            .SetPartitionsRevokedHandler((c, partitions) =>
            {
                Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
            })
            .Build();
        {
            string topics = wotopicWorkOrder;
            consumerWO.Subscribe(topics);
            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken cancellationToken = source.Token;
            try
            {
                Thread t = new Thread(() =>
                {
                    try
                    {
                        isRunning = true;
                        while (!mustStop)
                        {
                            var consumeResult = consumerWO.Consume(cancellationToken );
                            if (consumeResult != null)
                            {
                                lock (this)
                                {
                                    if (consumeResult.IsPartitionEOF)
                                    {
                                        Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                                    }
                                    Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
                                    ProcessWO(consumeResult.Message);
                                }
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        logger.Error("Error during Consume WO : " + ex.ToString());
                    }
                });
                t.Start();
            }
            catch (ConsumeException e)
            {
                Console.WriteLine($"Consume error: {e.Error.Reason}");
            }
        }