We have a Kafka Streams application consuming 2 topics (I1 and I2, X partitions each), doing some transformation and filtering based on message headers and routing the result to output topics (O1, … ON). We run this application in Kubernetes cluster using Deployment with number of replicas (Pods) equal to X.
I1 and I2 represent more or less the same data. I1 contains the whole CDC (Change Data Capture) history and it’s slightly behind I2. While I2 contains only fresh data. The idea was to consume both I1 and I2, use just I1 data and then switch to I2 in the runtime. So far everything works as expected.
We’re doing our best to keep incoming data clean (and valid). But sometimes we may get into situation when the application fails either because of invalid data or some internal bug. It’s not a problem if application can recover itself. But if it can’t we get into the “failed application → failed pod → rescheduled pod → restarted app” loop. In our case it’s even worse. The other pod starts consuming failing partition and fails too. Almost immediately all our pods fail and get into restart loop for the same reason.
I wonder if there is a clean way to keep a valid piece of the app (valid I1,I2 partitions) running and restrict failures to just the instance (Pod) which was initially assigned to partition causing errors. This way we could leave the whole pipeline running while investigating / fixing the issue.
My initial idea was to switch from Deployment to StatefulSet and somehow assign partitions to specific instances by name. For example app-0 consumes just partitions “0” and app-N partition “N”. But Kafka Streams locks you to specific partition assignor ( StreamsPartitionAssignor). And the only customisable piece there is Task Assignor (using config settings marked as “internal”).
What else can be done?
Kafka Streams partitions assignment is based on the underlying KafkaConsumer’s group management. Thus, a manual assignment is not possible.
What you could try is to use static consumer groups. For this case, if one pod fails, no rebalance happens (if configured accordingly) and thus only one pod (and all it’s assigned partitions) go offline, while all others keep running.
You still cannot pin/assign partitions, but because you avoid a rebalance, the “bad” partition is not re-assigned and thus does not kill other pods, too. (Or course, if a single pod has multiple partitions assigned and only one partitions is bad, all partitions would go offline as the whole pod does offline, and none of these partitions is re-assigned to other pods).