I’m being told that I need to convert over from RabbitMQ to Kafka.
With this in mind, I need to provide the same behavior as if I was working with a Queue.
What I mean is, if I publish a message OrderShipped it should be received by 1 and only 1 consumer which would then send out an email. I never want to try and process this message again.
I’ve watch various videos and understand that there are fundamental differences between how Kafka / RabbitMQ operate.
Our codebase is .NET Core, so I will need to use Confluent’s .NET Client
What I’m looking for
- I will have 2 servers each with 1 topic consumer (OrderShipped) - my understanding is that I will need 2 partitions and a GroupId
- each message should only be processed once - I realize message will be sent to both - how to prevent sending 2 emails
- in the event the consumer restarts - pickup and continue with the first message that has not previously been processed
I am looking for a working .NET example that contains all these points that I can take apart and understand. I don’t care about the SerDes, Schema Registry, etc. at this point in time. Whatever the example has in this regard is fine, I don’t care.
If you have two consumers configured with the same group.id, they form a consumer group, and each partition is assigned to a single member in a consumer group. This way, a message should be read by only one of the two consumers, not both.
To restart where you left off, you can commit offsets (either automatically based on a configureable time interval, or expliclity in you code). Note that in Kafka there is no individual message commits like in a queue though, but when you commit offset X, it means you did process all messages with offset smaller than X, and X is the next offset you want to read from.
Thus, for a failure case, you might re-read some already processed messages, as you would resume from the last committed offset X. While technically possible, you could commit after each processed message, but the overhead is quite high, and it’s not recommended to commit every message.
I would recommend digging into your email service provider’s delivery guarantees as well as any idempotency / duplication avoidance features they might have, or if they put the onus on you to safeguard against these scenarios.
There are some approaches to process a message exactly once (see the consume-process-produce pattern mentioned here), but it would be a shame to go through the effort and take the performance hit of implementing EOS within Kafka only to have the ESP potentially dropping messages or allowing duplicates (and probably only reporting back about this asynchronously). If you find that the ESP doesn’t provide any protection, then you might have to track message IDs and avoid duplicates yourself. If you’re doing that, then perhaps you can avoid manually committing offsets and/or introducing Kafka idempotent producer / transactions for exactly once semantics. Just allow for dupes since you already have to handle them.
Could easily be decrement the inventory by quantity purchased.
I wouldn’t want to do that multiple times as well.
If I have 2 consumers with different group IDs, done to provide concurrent processing of messages, somehow the 2 consumers would have to know that they both should NOT decrement the inventory.
It is the concept of not doing “X” multiple times based on the number of consumers in different groups that I’m trying to understand how to implement.
group.id - yep, got that. My wording wasn’t clear.
If the consumer starts cold, say after a new deployment, it will have to figure out where it last left off. The consumer wouldn’t have the last commit offset. My understanding is that I would configure it to earliest message but then would have to determine if each message has been seen/processed previously somehow.
OK, I see - I’d generalize to considering dupe processing end to end. Is the righthand end Kafka or something else? A different persistent data store? An API call? If the righthand end isn’t Kafka, then what guarantees does it offer / what idempotency features does it have? Those are questions worth considering now since the answers may impact the approach to take within the Kafka “walls”.
Can you give more details on the use case and why you want the group IDs to be different? It sounds like a good case to have one consumer group ID so that the 2 consumers share the load in such a way that each partition’s data only goes to one consumer.
The consumer would begin from the last committed offset for the partition. Or, if there aren’t any committed offsets yet (the group was just created and consumption begins) then there is config to dictate where to start (auto.offset.reset) or you can explicitly set a specific offset to start from if you don’t want earliest or latest.
This docs page is a good resource with more info on consumer groups and offset management.
That’s helpful. I think it’d help to view the 3 consuming use cases as services (inventory management, billing, email), each having a distinct group ID so that you meet the message will be seen one time by each consumer type requirement. From a service deployment perspective you’ll eventually have to think about physical servers / infrastructure, but from a system design perspective and thinking about how to map the overall system to Kafka concepts like topics and consumer groups, you can sort of set aside where the services are running for now. Even the number of topic partitions and how many instances of each service (and consumers in the consumer group(s) to run) can largely be set aside for now as you work through the service(s) / topic design.
I like this blog for thinking about consumer/producer-based microservices. It’s a toy pizza-making example in Python, and the processing is serial rather than parallel fanout, which might not be what you want, but I think it’s a good microservices example that you can tweak to get different service ordering or other properties. E.g., instead of having serial setup where the services hand off to different topics (sauce service consumes from pizza and produces to pizza-with-sauce, cheese service consumes from pizza-with-sauce and produces to pizza-with-cheese, etc.), you might have the 3 services all consuming from the same orders topic.
Going back to the error handling and dupes, I’d recommend checking out this blog on the dual write problem and patterns for solving it. It sounds like you need to think about that for the inventory management. You might already be facing this and handling this in the RabbitMQ-based status quo – i.e., what happens if things go between acknowledging messaging consumption to RabbitMQ and decrementing inventory in the DB. That can be tricky to solve and you might have to settle on an eventual consistency approach as described in that blog, or rely on the database to store Kafka consumer offsets so that inventory updates and offset management can be handled in a DB transaction.
The other area that I’d dig into is synchronization among these services. If you have one orders topic and three microservices that consume from it, in general you can’t guarantee anything about the order or relative timing that the three services process a particular order. For a given order, inventory might get updated, and then the email sent, and then the credit card a while later; or any other combo. When things go wrong (credit card API down) then it can get even more out of sync. If some level of synchronization among these services is desired, you might actually want to consider serial processing as in the pizza example. This is another area where it’s worth looking at the Rabbit MQ-based status quo - what sync / ordering properties does it have, what happens in failure scenarios, etc. Do you want to keep those same properties or revisit them?
I forgot to mention “Queues for Kafka” that is coming soon (early access in Apache Kafka 4.0.0). This is a good blog on the motivation (tl;dr sacrifice ordering within a topic partition for not having to think as hard about partitioning) as well as the API and what about queues this does and doesn’t do. Queues for Kafka isn’t necessary for your use case but I have to mention it in a post about “making Kafka act like a Queue”.