Need to add 16 digit UUID to kafka header

Hi Team

I have a http client connector before sending data to kafka i want to add a 16 digit random uuid for each record and add it to the header how i can achieve same

hey @snehil2209

welcome :slight_smile:

maybe you’re looking for something like this

best,
michael

thanks for the reply is thr any way i can generate 16 digit random id and use above transformer to add to header?

did you check

I guess this one might be helpful

best,
michael

thanks for above link but getting below error after implementing same

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:190)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:198)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:82)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:120)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:739)
… 10 more
Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)

Login exceptions would be unrelated to usage of any Connector transform.

Please show your connector client configurations

thanks
i see below error now
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [adding UUID to record], found: java.lang.String
at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
at ews.edap.uuid.SaferPaymentUUIDGeneratorTransformer.applyWithSchema(SaferPaymentUUIDGeneratorTransformer.java:75)
at ews.edap.uuid.SaferPaymentUUIDGeneratorTransformer.apply(SaferPaymentUUIDGeneratorTransformer.java:60)

As the error says, Only Struct objects supported ... found: java.lang.String

Please try a config like transforms=hoist,insertuuid to wrap the string into a named field of a Struct, then a UUID can be added to that.

Note: This will place the UUID in the value, not the Header. The InsertHeader transform only seems to work on static values - InsertHeader | Confluent Documentation

{
“name”: “test_try1”,
“config”: {
“connector.class”: “io.confluent.connect.http.HttpSinkConnector”,
“tasks.max”: “2”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“transforms”: “insertuuid”,
“topics”: “test”,
“transforms.insertuuid.type”: “testcompany.edap.uuid.UUID$Value”,
“transforms.insertuuid.uuid.field.name”: “test”,
“http.api.url”: “https://123:37911/”,
“request.method”: “POST”,
“headers”: “Content-Type:application/json|X-SP-Request-Type:ModelRequest|X-SP-Message-Type-Id:22”,
“header.separator”: “|”,
“http.proxy.user”: “abc”,
“http.proxy.password”: “def”,
“auth.type”: “NONE”,
“https.ssl.key.password”: “changeit”,
“https.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“https.ssl.keystore.password”: “changeit”,
“https.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“https.ssl.truststore.password”: “changeit”,
“https.ssl.keystore.type”: “JKS”,
“https.ssl.protocol”: “TLSv1.2”,
“https.ssl.truststore.type”: “JKS”,
“reporter.result.topic.name”: “test.response”,
“reporter.result.topic.replication.factor”: “3”,
“reporter.result.topic.partitions”: “2”,
“reporter.error.topic.name”: “test.error”,
“reporter.error.topic.replication.factor”: “3”,
“reporter.error.topic.partitions”: “2”,
“reporter.bootstrap.servers”: “”,
“confluent.topic.bootstrap.servers”: “”,
“confluent.topic.replication.factor”: “3”,
“principal.service.name”: “c”,
“principal.service.password”: “v”,
“value.converter.schema.registry.ssl.key.password”: “changeit”,
“reporter.producer.sasl.kerberos.service.name”: “kafka”,
“value.converter.schema.registry.ssl.keystore.password”: “changeit”,
“reporter.admin.sasl.kerberos.service.name”: “kafka”,
“confluent.topic.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“reporter.producer.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/home/testuser/.kerberos/testuser.keytab" principal="testuser@testcompany.INT";”,
“reporter.admin.ssl.key.password”: “changeit”,
“reporter.producer.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“reporter.admin.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“confluent.topic.ssl.key.password”: “changeit”,
“confluent.topic.ssl.keystore.password”: “changeit”,
“value.converter.schema.registry.url”: “”,
“reporter.producer.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“reporter.admin.ssl.keystore.password”: “changeit”,
“confluent.topic.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/home/testuser/.kerberos/testuser.keytab" principal="testuser@testcompany.INT";”,
“value.converter.schemas.enable”: “false”,
“reporter.admin.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“ssl.enabled”: “True”,
“https.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/home/testuser/.kerberos/testuser.keytab" principal="testuser@testcompany.INT";”,
“value.converter.schema.registry.ssl.truststore.password”: “changeit”,
“value.converter.schema.registry.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“confluent.topic.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“reporter.admin.ssl.truststore.password”: “changeit”,
“reporter.producer.ssl.truststore.password”: “changeit”,
“reporter.producer.ssl.keystore.password”: “changeit”,
“reporter.admin.security.protocol”: “SASL_SSL”,
“value.converter.schema.registry.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“reporter.producer.security.protocol”: “SASL_SSL”,
“https.sasl.kerberos.service.name”: “kafka”,
“reporter.admin.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/home/testuser/.kerberos/testuser.keytab" principal="testuser@testcompany.INT";”,
“confluent.topic.security.protocol”: “SASL_SSL”,
“confluent.topic.ssl.truststore.password”: “changeit”,
“reporter.producer.ssl.key.password”: “changeit”,
“confluent.topic.sasl.kerberos.service.name”: “kafka”
}
}

am using above setup for header addition but its not printing to message what change i need to do/

I suggest creating a minimal example without SSL.

As mentioned, InsertUUID$Value will try to place the data into the value of the record, not the header.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.