Need to add 16 digit UUID to kafka header

Hi Team

I have a http client connector before sending data to kafka i want to add a 16 digit random uuid for each record and add it to the header how i can achieve same

hey @snehil2209

welcome :slight_smile:

maybe you’re looking for something like this

best,
michael

thanks for the reply is thr any way i can generate 16 digit random id and use above transformer to add to header?

did you check

I guess this one might be helpful

best,
michael

thanks for above link but getting below error after implementing same

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:190)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:198)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:82)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:120)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:739)
… 10 more
Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)

Login exceptions would be unrelated to usage of any Connector transform.

Please show your connector client configurations

thanks
i see below error now
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [adding UUID to record], found: java.lang.String
at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
at ews.edap.uuid.SaferPaymentUUIDGeneratorTransformer.applyWithSchema(SaferPaymentUUIDGeneratorTransformer.java:75)
at ews.edap.uuid.SaferPaymentUUIDGeneratorTransformer.apply(SaferPaymentUUIDGeneratorTransformer.java:60)

As the error says, Only Struct objects supported ... found: java.lang.String

Please try a config like transforms=hoist,insertuuid to wrap the string into a named field of a Struct, then a UUID can be added to that.

Note: This will place the UUID in the value, not the Header. The InsertHeader transform only seems to work on static values - InsertHeader | Confluent Documentation

{
“name”: “test_try1”,
“config”: {
“connector.class”: “io.confluent.connect.http.HttpSinkConnector”,
“tasks.max”: “2”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“transforms”: “insertuuid”,
“topics”: “test”,
“transforms.insertuuid.type”: “testcompany.edap.uuid.UUID$Value”,
“transforms.insertuuid.uuid.field.name”: “test”,
“http.api.url”: “https://123:37911/”,
“request.method”: “POST”,
“headers”: “Content-Type:application/json|X-SP-Request-Type:ModelRequest|X-SP-Message-Type-Id:22”,
“header.separator”: “|”,
“http.proxy.user”: “abc”,
“http.proxy.password”: “def”,
“auth.type”: “NONE”,
“https.ssl.key.password”: “changeit”,
“https.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“https.ssl.keystore.password”: “changeit”,
“https.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“https.ssl.truststore.password”: “changeit”,
“https.ssl.keystore.type”: “JKS”,
“https.ssl.protocol”: “TLSv1.2”,
“https.ssl.truststore.type”: “JKS”,
“reporter.result.topic.name”: “test.response”,
“reporter.result.topic.replication.factor”: “3”,
“reporter.result.topic.partitions”: “2”,
“reporter.error.topic.name”: “test.error”,
“reporter.error.topic.replication.factor”: “3”,
“reporter.error.topic.partitions”: “2”,
“reporter.bootstrap.servers”: “”,
“confluent.topic.bootstrap.servers”: “”,
“confluent.topic.replication.factor”: “3”,
“principal.service.name”: “c”,
“principal.service.password”: “v”,
“value.converter.schema.registry.ssl.key.password”: “changeit”,
“reporter.producer.sasl.kerberos.service.name”: “kafka”,
“value.converter.schema.registry.ssl.keystore.password”: “changeit”,
“reporter.admin.sasl.kerberos.service.name”: “kafka”,
“confluent.topic.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“reporter.producer.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=”/home/testuser/.kerberos/testuser.keytab" principal=“testuser@testcompany.INT”;",
“reporter.admin.ssl.key.password”: “changeit”,
“reporter.producer.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“reporter.admin.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“confluent.topic.ssl.key.password”: “changeit”,
“confluent.topic.ssl.keystore.password”: “changeit”,
“value.converter.schema.registry.url”: “”,
“reporter.producer.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“reporter.admin.ssl.keystore.password”: “changeit”,
“confluent.topic.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=”/home/testuser/.kerberos/testuser.keytab" principal=“testuser@testcompany.INT”;",
“value.converter.schemas.enable”: “false”,
“reporter.admin.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“ssl.enabled”: “True”,
“https.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=”/home/testuser/.kerberos/testuser.keytab" principal=“testuser@testcompany.INT”;",
“value.converter.schema.registry.ssl.truststore.password”: “changeit”,
“value.converter.schema.registry.ssl.keystore.location”: “/etc/vault-agent/keystore.jks”,
“confluent.topic.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“reporter.admin.ssl.truststore.password”: “changeit”,
“reporter.producer.ssl.truststore.password”: “changeit”,
“reporter.producer.ssl.keystore.password”: “changeit”,
“reporter.admin.security.protocol”: “SASL_SSL”,
“value.converter.schema.registry.ssl.truststore.location”: “/opt/java/lib/security/cacerts”,
“reporter.producer.security.protocol”: “SASL_SSL”,
“https.sasl.kerberos.service.name”: “kafka”,
“reporter.admin.sasl.jaas.config”: “com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=”/home/testuser/.kerberos/testuser.keytab" principal=“testuser@testcompany.INT”;",
“confluent.topic.security.protocol”: “SASL_SSL”,
“confluent.topic.ssl.truststore.password”: “changeit”,
“reporter.producer.ssl.key.password”: “changeit”,
“confluent.topic.sasl.kerberos.service.name”: “kafka”
}
}

am using above setup for header addition but its not printing to message what change i need to do/

I suggest creating a minimal example without SSL.

As mentioned, InsertUUID$Value will try to place the data into the value of the record, not the header.