Producer Fenced Exceptions in kafka streams 2.7

Hi,
I recently upgraded my kafka-streams client from 2.2.1 to 2.8.2 following the upgrade guide. Since the upgrade, we’ve been seeing more stalled apps and a lot more ProducerFencedException & InvalidProducerEpochException.
(The broker itself is on kafka 2.7 and was running on this version previously without any issues).
Our apps use the initial EOS semantics (not the EOS beta/v2). Are there are any specific changes that would cause this and ideas how we could mitigate this.

Some client side stacktraces below

2022-11-03 17:57:53,223 ERROR [kafka-producer-network-thread | entity-job-runner-update-entities-entity-job-v0.0-gen-app43.nj1.yext.com-StreamThread-2-0_10-producer] RecordCollectorImpl - stream-thread [entity-job-runner-update-entities-entity-job-v0.0-gen-app-StreamThread-2] task [0_10] Error encountered sending record to topic job-results for task 0_10 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
2022-11-03 17:57:53,653 WARN  [entity-job-runner-update-entities-entity-job-v0.0-gen-app-StreamThread-2] StreamThread - stream-thread [entity-job-runner-update-entities-entity-job-v0.0-gen-app-StreamThread-2] Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group.
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic job-results for task 0_10 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:707) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:693) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:640) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:574) ~[EntityJobRunner_deploy.jar:?]
	at java.util.ArrayList.forEach(ArrayList.java:1511) ~[?:?]
	at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:561) ~[EntityJobRunner_deploy.jar:?]
	at java.lang.Iterable.forEach(Iterable.java:75) ~[?:?]
	at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:561) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$3(Sender.java:785) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327) ~[EntityJobRunner_deploy.jar:?]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242) ~[EntityJobRunner_deploy.jar:?]
	at java.lang.Thread.run(Thread.java:832) ~[?:?]
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.

Another stacktrace:

2022-11-04 10:36:23,907 ERROR [dataingestion-extraction-transform-mapping-load-entity-v0.1-gen-app1-StreamThread-8] ConsumerCoordinator - [Consumer clientId=dataingestion-extraction-transform-mapping-load-entity-v0.1-gen-app1-StreamThread-8-consumer, groupId=dataingestion-extraction-transform-mapping-load-entity-v0.1] User provided listener org.apache.kafka.streams.processor.internals.StreamsRebalanceListener failed on invocation of onPartitionsRevoked for partitions [extract-transform-map-data-20]
Caused by: org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:511) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:411) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:315) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:95) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(TaskManager.java:541) [DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1107) ~[DataIngestionRunner_deploy.jar:?]
	at org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:252) ~[DataIngestionRunner_deploy.jar:?]
org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced trying to commit a transaction [stream-thread [dataingestion-extraction-transform-mapping-load-entity-v0.1-gen-app1-StreamThread-8] task [0_20]]; it means all tasks belonging to this thread should be migrated.
]

Would updating the max.transaction.timeout.ms on stream producer or increasing the timeout on the broker help with this or even increasing the max.poll.interval.ms to a higher value( it seems like this was updated from Int.MAX → 5 mins).
Also would like to understand why a client upgrade would cause this

Appreciate any help here,

Thanks,
Vinay