I am using Kafka connect to send elasticsearch data to kafka. Once the connector is running, a topic is automatically created whose name is the elasticsearch index followed by a prefix. Now, I would like to split this topic into 3 topics by condition
I am using Kafka connect to send elasticsearch data to kafka. Once the connector is running, a topic is automatically created whose name is the elasticsearch index followed by a prefix. Now, I would like to split this topic into N topics by condition
all my output kafka topic is like this:
{"schema":
{"type":"struct",
"fields":[
{"type":"string","optional":true,"field":"nature"},
{"type":"string","optional":true,"field":"description"},
{"type":"string","optional":true,"field":"threshold"},
{"type":"string","optional":true,"field":"quality"},
{"type":"string","optional":true,"field":"rowid"},
{"type":"string","optional":true,"field":"avrotimestamp"},
{"type":"array","items":{"type":"string","optional":true},"optional":true,"field":"null"},
{"type":"string","optional":true,"field":"domain"},
{"type":"string","optional":true,"field":"name"},
{"type":"string","optional":true,"field":"avroversion"},
{"type":"string","optional":true,"field":"esindex"},
{"type":"string","optional":true,"field":"value"},
{"type":"string","optional":true,"field":"chrono"},
{"type":"string","optional":true,"field":"esid"},
{"type":"string","optional":true,"field":"ts"}],"optional":false,"name":"demofilter"},
"payload":
{
"nature":"R01",
"description":"Energy",
"threshold":"","quality":"192",
"rowid":"34380941",
"avrotimestamp":"2022-09-20T04:00:11.939Z",
"null":["TGT BQ 1B"],
"domain":"CFO",
"name":"RDC.R01.RED.MES",
"avroversion":"1",
"esindex":"demo_filter",
"value":"4468582",
"chrono":"133081200000000000",
"esid":"nuWIrYMBHyNMgyhJYscV",
"ts":"2022-09-20T02:00:00.000Z"
}
}
the description field takes several values but should contain one of these keywords: energy, electric, and temperature (example: life energy, body temperature, car energy)
the goal is that when the description field has the energy keyword, the data must be sent to the energy topic and so on, all in real time of course
what i was looking for:
- according to my research kafka stream is an option, unfortunately with the wordcount example I can’t figure out how I can do it. (I’m learning kafka stream for data processing).
- use python to sort after consuming the data but it takes time and loses the word in real time
What should I do?