Send messages from one Kafka topic to another

Dear all,
Is it possible to forward messages from one topic to another topic on confluent.cloud? If yes, how can I do this?

Example of an use case: I have a DLQ (dead-letter queue) topic and want to move its messages to a retry one.

Thank you,

1 Like

Kafka Streams or ksqlDB is what you’re looking for, I assume

Can’t you just read from the DLQ for your retry logic?

1 Like

Thank you for the responses. Indeed, we might use this approach instead. Looks like it will be the simplest solution. In any case, I will take a look at how can I move packages from a topic to another using Kafka Streams or ksqlDB.

2 Likes

Hi again.
I spent some time on ksqlDB and was able to send events from one topic to another. The PoC was validated using events comprised of small json (i.e. having one or two fields). However, in our use case, we work with jsons having several fields and it would be nice if we could do this transfer without having to add a schema into the Schema Registry or declare the columns during the stream creation. Googling around I didn’t find a way to overcome this dependence. Is it possible to this without the column/schema declaration?

Can you elaborate a bit again on what the use case is here? From what I’ve understood so far I don’t understand the need to copy messages between topics - just consume from the same topic with multiple consumers as required.

The goal is that our operators can reprocess the DLQs on demand. We would like to have a manual solution where we have full control of which topics are being processed. Thus, an approach that requires just copy and paste of some few commands to Confluent in order to transfer events from one topic (DLQ) to another (i.e. Retry topic) would work just fine to us.
We explored a bit the proposal of adding a new consumer to consume the messages from the DLQs. As a manual approach is needed, it will require some implementation in our code base, including the capacity to stop and resume the consumption of the DLQs, along with the creation of new endpoints. We cannot just let the consumers on day and night because an infinite loop might happen, in which messages are sent to retry topics, then somehow the microservices cannot process them and consequently they go to the DLQs again. That said, we will resort to this solution if nothing else works.

If it’s a manual process, can’t you just stop/start a consumer against the DLQ topic as required?

BTW: my insistence on following this path is not that it is not possible to copy data between topics (it is), but more that it shouldn’t be necessary - Kafka itself is designed to hold events in one place for multiple independent logical applications to make use of it. To go against that without a good reason (which I can’t see yet) would not be a good idea IMHO.

Yes, it is feasible to stop/start the consumers as required. However, we will need to expose some endpoints and create a web page to use them. As our team uses Confluent interface regularly, would be nice to leverage an out-of-the-box feature to just solve this problem. For instance, execute some commands on ksql and then just let the events move from one topic to another. I was able to do this using ksql; nonetheless, I would like to know if it is possible to create a stream without having to declare the columns or schema of the data/event.

@pedrorezende

  • ksqlDB is designed to process event streams so defining the schema of the events on a topic is necessary to enable that. For example, you might want to filter the events on the DLQ topic prior to routing them to the retry topic, but in order to do that, ksqlDB needs to understand the data format to apply the filtering. I’m not aware of a command in ksqlDB that will allow a byte-for-byte only copy of topics, but I will pass this question / idea along.
  • Like @rmoff said, another option to to configure your consumers to read from the same topic as the DLQ, one reason to choose this is replicated data is additional cost to store, process, etc…
  • However, if I understand correctly, you want to implement a manual process for initiating the retry logic. In this case, a different direction you could explore is sinking the events into a database using Kafka Connect (Confluent Cloud has many managed options for this), and then you could build your manual retry process around rows in a database instead of trying to adapt stream processing to your use case. When things fail, they end up in the DLQ and then in the table via Kafka Connect. When your operator wants to retry a failed event, they “pop” it off the table and retry starting the process over again. This would of course require additional engineering.
  • Traditionally, users may have choosen to use Apache Kafka Mirror Maker or Confluent Replicator to make a copy, either of these will require you to manage an application outside of Confluent Cloud.
  • If you’d like to create an exact copy of a topic, another potential option is Confluent Cloud Cluster Linking and Mirror Topics. I’m not certain about making copies of topics on the same cluster, but if you’re using Confluent Cloud maybe specialized clusters are an additional option to consider.
2 Likes