How to divide/parse topics in kafka to another topics by condition in real time?

I am using Kafka connect to send elasticsearch data to kafka. Once the connector is running, a topic is automatically created whose name is the elasticsearch index followed by a prefix. Now, I would like to split this topic into 3 topics by condition
I am using Kafka connect to send elasticsearch data to kafka. Once the connector is running, a topic is automatically created whose name is the elasticsearch index followed by a prefix. Now, I would like to split this topic into N topics by condition

all my output kafka topic is like this:

{"schema":
    {"type":"struct",
    "fields":[
        {"type":"string","optional":true,"field":"nature"},
        {"type":"string","optional":true,"field":"description"},
        {"type":"string","optional":true,"field":"threshold"},
        {"type":"string","optional":true,"field":"quality"},
        {"type":"string","optional":true,"field":"rowid"},
        {"type":"string","optional":true,"field":"avrotimestamp"},
        {"type":"array","items":{"type":"string","optional":true},"optional":true,"field":"null"},
        {"type":"string","optional":true,"field":"domain"},
        {"type":"string","optional":true,"field":"name"},
        {"type":"string","optional":true,"field":"avroversion"},
        {"type":"string","optional":true,"field":"esindex"},
        {"type":"string","optional":true,"field":"value"},
        {"type":"string","optional":true,"field":"chrono"},
        {"type":"string","optional":true,"field":"esid"},
        {"type":"string","optional":true,"field":"ts"}],"optional":false,"name":"demofilter"},
    "payload":
    {
        "nature":"R01",
        "description":"Energy",
        "threshold":"","quality":"192",
        "rowid":"34380941",
        "avrotimestamp":"2022-09-20T04:00:11.939Z",
        "null":["TGT BQ 1B"],
        "domain":"CFO",
        "name":"RDC.R01.RED.MES",
        "avroversion":"1",
        "esindex":"demo_filter",
        "value":"4468582",
        "chrono":"133081200000000000",
        "esid":"nuWIrYMBHyNMgyhJYscV",
        "ts":"2022-09-20T02:00:00.000Z"
    }
}

the description field takes several values ​​but should contain one of these keywords: energy, electric, and temperature (example: life energy, body temperature, car energy)

the goal is that when the description field has the energy keyword, the data must be sent to the energy topic and so on, all in real time of course

what i was looking for:

  1. according to my research kafka stream is an option, unfortunately with the wordcount example I can’t figure out how I can do it. (I’m learning kafka stream for data processing).
  2. use python to sort after consuming the data but it takes time and loses the word in real time

What should I do?

You can use a TopicNameExtractor in the to transformation. See KStream (kafka 3.3.1 API)