Unable to create KafkaProducer

We are facing an inconstant KafkaProducer creation error in a multithreaded production environment, and cannot fix it no matter which solutions we try:

  1. Loading serializers explicitly
  2. Resetting the class loader to null
  3. Using KafkaTemplate as a singleton

The same codebase works in the dev environment, which has less load, and locally.

We are running Spring Boot application with Spring Boot v3.0.0

The way we construct the producer is:

import com.ourcompany.MessageDto;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
public class KafkaConfig {
  @Value("${spring.kafka.producer.bootstrap-servers}")
  private String bootstrapServers;
  @Value("${spring.kafka.producer.schema-registry}")
  private String schemaRegistry;

  @Bean
  public KafkaTemplate<String, MessageDto> kafkaTemplate() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // this is valid server url
    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    configs.put("schema.registry.url", schemaRegistry);  // this is valid schema registry url

    var kafkaProducer =
        new DefaultKafkaProducerFactory<String, MessageDto>(configs);

    return new KafkaTemplate<>(kafkaProducer);
  }
}

and the error we get is:

Failed sending MessageDto eventorg.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:468)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:884)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:777)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:747)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:727)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:721)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:837)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:709)
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:691)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:475)
    at com.ourcompany.svc.producers.MessageEventsProducer.sendMessageEvent(MessageEventsProducer.java:89)
    at com.ourcompany.svc.producers.MessageEventsProducer.createMessageEvent(MessageEventsProducer.java:78)
    at com.ourcompany.svc.service.MessageService.createMessage(MessageService.java:100)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:699)
    at com.ourcompany.svc.service.MessageService$$SpringCGLIB$$0.createMessage(<generated>)
    at com.ourcompany.svc.service.AutomaticTaggingService.reEvaluateTag(AutomaticTaggingService.java:150)
    at com.ourcompany.svc.service.AutomaticTaggingService.reEvaluateAllCategoryTags(AutomaticTaggingService.java:168)
    at com.ourcompany.svc.service.AutomaticTaggingService.reEvaluateProduct(AutomaticTaggingService.java:68)
    at com.ourcompany.svc.job.AutomaticTaggingJob.lambda$scheduleAsyncProcesses$0(AutomaticTaggingJob.java:154)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class io.confluent.kafka.serializers.KafkaAvroSerializerConfig
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:396)
    ... 31 common frames omitted

dependencies are:

    implementation "org.apache.kafka:kafka-streams"
    implementation "org.springframework.kafka:spring-kafka"

    implementation "org.apache.avro:avro:1.11.1"
    implementation "io.confluent:kafka-avro-serializer:6.2.2"
    implementation "io.confluent:kafka-schema-serializer:6.2.2"
    implementation "io.confluent:kafka-schema-registry-client:6.2.2"
    implementation "io.confluent:common-utils:6.2.2"
    implementation "io.confluent:common-config:6.2.2"

Not sure, if this is the case, but I had the same error when used KafkaTemplate inside the parallel stream (JDK 11). There is a separate ClassLoaders used in parallel streams and required classes could not be loaded. I used custom ForkJoinPool for parallel streams that send messages via KafkaTemplate.

References:

Hello, I still got this error when I use confluent cloud to send messag in one topic, but in local I don’t got this