Hi
I am using the Kafka-connect ES sink library to write Kafka messages into ES.
now, I want to change the target index name in ES, I’m following this video but Index is created with the default topic name in ES.
Here is my configuration.
topics = test-kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
schema.ignore= true
key.ignore=false
transforms=changeIndexname
transforms.changeIndexname.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.changeIndexname.regex=(.*)-kafka
transforms.changeIndexname.replacement=foo-$1
I’m also using another transform for changing _id value and it’s working perfectly.
transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
but I can’t change the Index name.
Now If I use dropPrefix like this
transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=test-(.*)
transforms.dropPrefix.replacement=$1
but it still didn’t work. but If I remove the property of transforms=InsertKey,ExtractId
Index renames successfully. Why can’t I use transforms=InsertKey,ExtractId
and transforms=dropPrefix
in same configuration?
rmoff
12 April 2021 13:40
2
It’s hard to say for sure because you’ve not shared the full configuration of each permutation that you’re describing, but two possibilities occur to me:
Your topic doesn’t match the regex (.*)-kafka
If you are combining transforms you have to do so in one single tranform
expression, i.e.
transforms=dropPrefix,InsertKey,ExtractId
not
transforms=InsertKey,ExtractId
transforms=dropPrefix
If neither of these are the answer then please can you share (a) the full configuration that isn’t working (you shared one, but that doesn’t include the InsertKey,ExtractId
that you also mention) and (b) the name of the topic that you’re reading from
1 Like
Thanks, Robin. the actual solution is point 2. This is my entire configuration file
name = es-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
input.data.format=JSON
output.data.format=JSON
topics = test-kafka
connection.url = http://192.168.0.245:9200
key.converter=org.apache.kafka.connect.storage.StringConverter
#schema.ignore=true
key.ignore=false
# Write configuration
behavior.on.null.values=DELETE
transforms=InsertKey,ExtractId,dropPrefix
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=test-(.*)
transforms.dropPrefix.replacement=$1
1 Like
system
Closed
19 April 2021 15:34
4
This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.