Kafka-connect ES sink change target Index name

Hi

I am using the Kafka-connect ES sink library to write Kafka messages into ES.

now, I want to change the target index name in ES, I’m following this video but Index is created with the default topic name in ES.

Here is my configuration.

    topics = test-kafka
    key.converter=org.apache.kafka.connect.storage.StringConverter
    schema.ignore= true
    key.ignore=false

    transforms=changeIndexname
    transforms.changeIndexname.type=org.apache.kafka.connect.transforms.RegexRouter
    transforms.changeIndexname.regex=(.*)-kafka
    transforms.changeIndexname.replacement=foo-$1

I’m also using another transform for changing _id value and it’s working perfectly.

transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id

but I can’t change the Index name.

Now If I use dropPrefix like this

transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=test-(.*)
transforms.dropPrefix.replacement=$1

but it still didn’t work. but If I remove the property of transforms=InsertKey,ExtractId

Index renames successfully. Why can’t I use transforms=InsertKey,ExtractId
and transforms=dropPrefix in same configuration?

It’s hard to say for sure because you’ve not shared the full configuration of each permutation that you’re describing, but two possibilities occur to me:

  1. Your topic doesn’t match the regex (.*)-kafka

  2. If you are combining transforms you have to do so in one single tranform expression, i.e.

    transforms=dropPrefix,InsertKey,ExtractId
    

    not

    transforms=InsertKey,ExtractId
    transforms=dropPrefix
    

If neither of these are the answer then please can you share (a) the full configuration that isn’t working (you shared one, but that doesn’t include the InsertKey,ExtractId that you also mention) and (b) the name of the topic that you’re reading from

1 Like

Thanks, Robin. the actual solution is point 2. This is my entire configuration file

name = es-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1

input.data.format=JSON
output.data.format=JSON

topics = test-kafka


connection.url = http://192.168.0.245:9200

key.converter=org.apache.kafka.connect.storage.StringConverter
#schema.ignore=true
key.ignore=false
# Write configuration

behavior.on.null.values=DELETE


transforms=InsertKey,ExtractId,dropPrefix
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=test-(.*)
transforms.dropPrefix.replacement=$1
1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.