Event sourcing with kafka

What is the best practice to structure a message for a topic containing different types that need to be sorted.

Example

Topic: user-events Event types: UserCreatedEvent, UserUpdatedEvent, UserDeletedEvent. Those events need to be saved in the same topic and partition to guarantee the order.

Possible solutions I see

  1. Single schema containing all event type fields
  2. Schema containing all event types schemas. {eventId, timestamp, userCreated: {}, userUpdated: {}, userDeleted: {}}
  3. Different schema for event using Avro union

Pro

  1. Easy to implement and process as a stream
  2. Easy to implement, process as a stream and setup required fields for each event type
  3. Every message is an event

Cons

  1. Possible to have many empty fields and it’s not possible to specify required fields per event type
  2. Not clear the message type without inspecting the payload
  3. Difficult to deserialize (GenericRecord)

Are there other possible solutions, how do you normally handle a topic with different message types? How do you process this king of topics?

Any reference to code example is welcome.

Thanks

Don’t have any specific recommendations, but these two blog posts might give you some ideas:

Apologies if you’ve already read them. :slight_smile:

Hi Dave,
Thanks for the reply.

Yes I already read those articles and I have to say that they give you only a partial answer.

The first tells you when is a good idea to save different types into the same topic, and event sourcing is a good fit.
The second, it’s more technical and illustrate the possibility of doing this with Avro union.

But none of them explain in details how to do it with a real example.

I have seen projects on github where they simplified the scenario by creating a single schema, more as a state than actual event (point 1.).
Talking with someone with some experience using kafka, came up with the solution explained at point 2 by nesting the events into a “carrying event”.

I managed yesterday (I will share the solution asap) to use avro union and deserialize the events as GenericRecord and do transformation based on the event type.
Since I didn’t see any solution similar I was curious if I was missing something, like drawbacks (e.g. Ksqldb doesn’t support different types) or better practices to do the same in kafka.

I used several Avro messages before, and that worked quite nice, as in GitHub - gklijs/bob2021: Repo used for a tutorial during BOB 2021, https://bobkonf.de/2021/klijs.html. I also went full-on using Kafka there, which has some downsides. Not sure If you really need Avro? With protobuf you can easily define multiple message types in one proto file, and use those for your topic. This means you don’t need to use unions, and under the hood the different types are efficiently encoded. It had the major advantage of easily reuse messages as part of other messages. It’s also possible with Avro and references and/or unions, but I feel like that’s a lot more difficult.

This is how I solved deserialization of different Avro schemas in the same topic. Still have problem with type Date.

Any reason to not use specific classes and just use instanceof to get the class?

Can you do an example?

This is an example in Kotlin with Spring Cloud obm_confluent_blog/MoneyTransferProcessor.kt at 691d0a70b60b79829be9e2b9ea0a41dda65d35eb · gklijs/obm_confluent_blog · GitHub. It’s using a specific serde, obm_confluent_blog/application.yml at 691d0a70b60b79829be9e2b9ea0a41dda65d35eb · gklijs/obm_confluent_blog · GitHub. There are different ways to do this. Also important is to have the compiles classes in the classpath. In the example this is done via obm_confluent_blog/avro_compile.clj at 691d0a70b60b79829be9e2b9ea0a41dda65d35eb · gklijs/obm_confluent_blog · GitHub and including the result as dependency.

Understood what you mean. I did try that way first but I was getting AvroTypeExceptions.
Example:
if (eventLocation instanceof EventPatientCheckedIn)

Yes so you should use the correct serde and/or set the correct setting. And supply the compiled classes, but makes it a lot easier.

I’m using GenericAvroDeserializer, what do you mean by using the correct serde? It’s there a way to know the specific Serde for each topic message that I’m reading? Your example seems a bit different because you process different topic but each topic has only one schema type and a specificSerde or maybe am I missing something?

There’s a bunch of them I gues you can set (.put KafkaAvroDeserializerConfig/SPECIFIC_AVRO_READER_CONFIG "true") like I did in the KafkaAvroDeserializer.

As you can see in schema-registry/avro-serde/src/main/java/io/confluent/kafka/streams/serdes/avro at master · confluentinc/schema-registry · GitHub there is a bunch of them.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.