We are facing an inconstant KafkaProducer creation error in a multithreaded production environment, and cannot fix it no matter which solutions we try:
- Loading serializers explicitly
- Resetting the class loader to null
- Using KafkaTemplate as a singleton
The same codebase works in the dev environment, which has less load, and locally.
We are running Spring Boot application with Spring Boot v3.0.0
The way we construct the producer is:
import com.ourcompany.MessageDto;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.schema-registry}")
private String schemaRegistry;
@Bean
public KafkaTemplate<String, MessageDto> kafkaTemplate() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // this is valid server url
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
configs.put("schema.registry.url", schemaRegistry); // this is valid schema registry url
var kafkaProducer =
new DefaultKafkaProducerFactory<String, MessageDto>(configs);
return new KafkaTemplate<>(kafkaProducer);
}
}
and the error we get is:
Failed sending MessageDto eventorg.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:468)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:884)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:777)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:747)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:727)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:721)
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:837)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:709)
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:691)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:475)
at com.ourcompany.svc.producers.MessageEventsProducer.sendMessageEvent(MessageEventsProducer.java:89)
at com.ourcompany.svc.producers.MessageEventsProducer.createMessageEvent(MessageEventsProducer.java:78)
at com.ourcompany.svc.service.MessageService.createMessage(MessageService.java:100)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:699)
at com.ourcompany.svc.service.MessageService$$SpringCGLIB$$0.createMessage(<generated>)
at com.ourcompany.svc.service.AutomaticTaggingService.reEvaluateTag(AutomaticTaggingService.java:150)
at com.ourcompany.svc.service.AutomaticTaggingService.reEvaluateAllCategoryTags(AutomaticTaggingService.java:168)
at com.ourcompany.svc.service.AutomaticTaggingService.reEvaluateProduct(AutomaticTaggingService.java:68)
at com.ourcompany.svc.job.AutomaticTaggingJob.lambda$scheduleAsyncProcesses$0(AutomaticTaggingJob.java:154)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.confluent.kafka.serializers.KafkaAvroSerializerConfig
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:396)
... 31 common frames omitted
dependencies are:
implementation "org.apache.kafka:kafka-streams"
implementation "org.springframework.kafka:spring-kafka"
implementation "org.apache.avro:avro:1.11.1"
implementation "io.confluent:kafka-avro-serializer:6.2.2"
implementation "io.confluent:kafka-schema-serializer:6.2.2"
implementation "io.confluent:kafka-schema-registry-client:6.2.2"
implementation "io.confluent:common-utils:6.2.2"
implementation "io.confluent:common-config:6.2.2"