Consumers complaining about unstable offsets?

Hello,

we are using Spring Kafka together with Kafka 2.5. We have a cluster consisting of 3 brokers, with ISR 1 and an RF of 2.

For some time now, we have been seeing the following types of log messages:

[Consumer clientId=consumer-reporting-service-30, groupId=reporting-service] The following partitions still have unstable offsets which are not cleared on the broker side: [invoice-service-1], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log

After researching these INFO logs a bit, there unfortunately wasn’t all that much information regarding unstable offsets aside from some source code documentation:

  /**
   * An unstable offset is one which is either undecided (i.e. its ultimate outcome is not yet known),
   * or one that is decided, but may not have been replicated (i.e. any transaction which has a COMMIT/ABORT
   * marker written at a higher offset than the current high watermark).
   */

What makes this more interesting is that, in some cases, these log message are almost exactly ​transaction.max.timeout.ms (900000ms) apart:

Oct 14, 2021 @ 11:18:17.637 [Consumer clientId=consumer-coupon-service-16, groupId=coupon-service] Fetching committed offsets for partitions: [invoice-service-8]
Oct 14, 2021 @ 11:18:17.639 [Consumer clientId=consumer-coupon-service-16, groupId=coupon-service] Failed to fetch offset for partition invoice-service-8: There are unstable offsets that need to be cleared.
Oct 14, 2021 @ 11:18:17.639[Consumer clientId=consumer-coupon-service-16, groupId=coupon-service] The following partitions still have unstable offsets which are not cleared on the broker side: [invoice-service-8], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log
Oct 14, 2021 @ 11:18:17.659[Consumer clientId=consumer-coupon-service-6, groupId=coupon-service] Committed offset 123345052 for partition invoice-service-8
// 15m later 
Oct 14, 2021 @ 11:33:17.637[Consumer clientId=consumer-coupon-service-16, groupId=coupon-service] The following partitions still have unstable offsets which are not cleared on the broker side: [invoice-service-8], this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log 

Going by the following diagram of the LEO and HW:

It seems to me that in this case, an unstable offset is indicative of replication lag, e.g. the offsets are not replicated fast enough in some cases, triggering a timeout. However, in this case I would have expected replica.lag.time.max.ms to kick in:

If a follower hasn’t sent any fetch requests or hasn’t consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr

We checked the current replication offsets however and the ISR seems to be behaving properly. Similarly, Broker logs themselves did not point to any obvious problems.

Does anyone have an idea as to what we could further investigate in this case?

Check your producers logs to that topic.

Are they failing to commit successful transactions or aborts within time?

Did somebody create long max transaction timeouts?

Did somebody mix transactional and non transactional produces in the same topic?

1 Like

Hi @sarwarbhuiyan, thanks for taking the time.

Our producer logs to the topic (and others that show the same issue/log message) unfortunately show nothing out of the ordinary for their topic. In particular, we don’t see any errors or warnings on the producer side.

Our producer configuration (with the exception of some KStreams-based producers) is identical for all of the producers experiencing this issue. The producer and consumer configs (I should have included them at the start, my apologies) are as follows:

ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [broker-1:9092, broker-2:9092, broker-3:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-3
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Consumer Config:

ConsumerConfig values: 
	allow.auto.create.topics = true
	auto.commit.interval.ms = 5000
	auto.offset.reset = latest
	bootstrap.servers = [broker-1:9092, broker-2:9092, broker-3:9092]
	check.crcs = true
	client.dns.lookup = default
	client.id = 
	client.rack = 
	connections.max.idle.ms = 540000
	default.api.timeout.ms = 60000
	enable.auto.commit = true
	exclude.internal.topics = true
	fetch.max.bytes = 52428800
	fetch.max.wait.ms = 500
	fetch.min.bytes = 1
	group.id = reporting-service
	group.instance.id = null
	heartbeat.interval.ms = 3000
	interceptor.classes = []
	internal.leave.group.on.close = true
	isolation.level = read_uncommitted
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	max.partition.fetch.bytes = 1048576
	max.poll.interval.ms = 300000
	max.poll.records = 500
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
	receive.buffer.bytes = 65536
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	session.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

In particular, as you can see from the consumer config above, auto-commit-offset is enabled (not quite sure what the Kafka property is named) with an interval of 5000ms. In addition, max.poll.records=500 and max.poll.interval.ms=300000 (= 300s).

Other settings that may have an impact:

  1. Our producers are synchronous.
  2. Our producers for the topics showing these issues are non-transactional.

Does any of that information help? As I said, we do not see and particularly suspicious logs on the broker side. Are there any targetted DEBUG logs we could enable to dig deeper and find out what is causing this, perhaps?

It looks like the producers are non-transactional since transactional.id is null. Are there any other producers to those topics (like Kafka Streams where they are using exactly once)?

Sorry for the late reply, it’s been a busy year :slight_smile:
There are indeed multiple producers to this topic, although they all behave identically (replicated service instances). These are all non-transactional (think: service receives HTTP request and produces one or more events after the request is successful).

There are no Kafka Streams producers to this topic, though there are KS consumers. I don’t think this should matter for this case though, since the log message is from a non-KS service.

Is there any other information I could provide to shed some light on this log message?

Then there’s probably something somewhere (whether by design or accidentally because of a library/framework, Spring with a @Transactional annotation somewhere) which is using transactions while at the same time there are producers which are NOT using transactions.

You need to do a full audit of all the places where producers are and remove transactional semantics there. Same with consumers, set “isolation.level” to read_uncommitted if you don’t really care about transactions.

Thanks for the quick reply.

Then there’s probably something somewhere (whether by design or accidentally because of a library/framework, Spring with a @Transactional annotation somewhere) which is using transactions while at the same time there are producers which are NOT using transactions.

I’m not 100% sure I follow, but from my understanding, this should not be possible. We do use Spring Cloud Stream with Kafka Binder for producing, but are not setting a transactionIdPrefix (which maps to transactional.id for the Kafka producer), which is required for transactional producers to be active, and is not set by default (and is not set in the ProducerConfig above).

To add a little more detail to the request flow for clarity:

  1. HTTP request comes in.
  2. Data is saved to DB in a database transaction (via, yes, @Transactional)
  3. In the same transaction, the records-to-be-published are added to a type of “outbox”.
  4. As an after-commit hook, all records that were part of that transaction are published in order (non-transactionally).

At no point during this process flow did we use Kafka transactions, and none of our producers have a configured transaction.id. Am I completely missing/misunderstanding something?

Same with consumers, set “isolation.level” to read_uncommitted if you don’t really care about transactions.

Yes, that is already the case for the consumers of this topic (see ConsumerConfig above).