Kafka Classes could not be found during runtime

I get the following error message during runtime, every time a Kafka event is generated in my production environment. Please note that the same code works in my local setup without any issues and all Jenkins builds are successful. The error is the following:

Caused by: o.a.k.c.c.ConfigException: Invalid value io.confluent.kafka.serializers.context.NullContextNameStrategy for configuration context.name.strategy: Class io.confluent.kafka.serializers.context.NullContextNameStrategy could not be found.\n"}

This is my Kafka config class:

 @Bean
    @Qualifier("json-schema")
    public KafkaTemplate<String, MyEvent<?>> jsonSchemaKafkaTemplate() throws ClassNotFoundException {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(setCommonProducerProperties(),
                new StringSerializer(),
                new KafkaJsonSchemaSerializer<>()));
    }


    private  Map<String, Object> setCommonProducerProperties(){
        // Setting common properties
        Map<String, Object> configProps = setBrokerProperties();
        configProps.put(ProducerConfig.RETRIES_CONFIG, KafkaPropertiesConfiguration.JsonSchemaProperties.RETRIES);
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, 
        return configProps;
    }

    private Map<String, Object> setBrokerProperties() {
        Map<String, Object> brokerProperties = new HashMap<>();
        brokerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBroker().getBootstrapServers());
        brokerProperties.put(SCHEMA_REGISTRY_URL_CONFIG, kafkaPropertiesConfiguration.getBroker().getSchemaRegistryUrl());

        KafkaPropertiesConfiguration.SecurityProperties securityProperties = kafkaPropertiesConfiguration.getBroker().getSecurity();
(... Keystore and Truststore credentials)

        return brokerProperties;
    }

Im using the following versions in my POMS:

        <spring-kafka.version>3.1.0</spring-kafka.version>
        <kafka-client.version>3.6.1</kafka-client.version>
        <confluent.version>7.6.0</confluent.version>

and:

   <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka-client.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka-client.version}</version>
            <classifier>test</classifier>
            <scope>test</scope>
        </dependency>

        <!-- Confluent Schema Registry Client -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-json-schema-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
        </dependency>

It is worth mentioning that some Kafka classes could not be found and loaded by the context loader during runtime, such as the “StringSerializer” class “KafkaJsonSchemaSerializer”, for this reason I have used the contstructor for both classes in my Kafka template. Otherwise I get the same error message that these classes could not be found.

The NullContextNameStrategy class should come from kafka-schema-serializer-7.6.0.jar, so double check that that jar is on the classpath in your production env.

I suspect that it will be there, and that this is due to the classloader loading a conflicting older version of that dependency. A brute force way to see if there is a conflict is to look for JARs containing a class that has been around for a long time, like RecordNameStrategy. For all directories on the classpath:

for jar in `ls *jar`; do jar tvf $jar | grep "io/confluent/kafka/serializers/subject/RecordNameStrategy.class" ; done;

See if a search like this turns up more than one result, and, if so, try to figure out where they’re each coming from (e.g., via mvn dependency:tree).

Finally, one thing that might be related since this feels like transitive dependency version conflict, is that I see here that spring-kafka 3.1.x is compatible with kafka-clients 3.6.0. I’m not sure if version 3.6.1 in your POM is problematic or “close enough” but it’s worth looking into.

2 Likes

Hello,
i have check my jar and indeed the kafka-schema-serializer-7.6.0.jar is present in the classpath along with all other related jars downloaded from https://packages.confluent.io/maven.

The command line that you have provided doesn’t deliver any result and also downgraded my kafka-clients to 3.6.0. Unfortunalty, i still get the same ConfigException.

This shouldn’t be the case – it should return exactly one result. E.g., jar tvf kafka-schema-serializer-7.6.0.jar should return RecordNameStrategy.class (and NullContextNameStrategy.class). Can you double check this?

The for loop command is intended to see if there is more than one result, which could also be problematic. Maybe, if you’re not getting any result, that jar is corrupted / empty in your production environment.

1 Like

Hi guys,

I’m facing the same problem with Salah. My code runs completely fine locally, but when I pack and run it in Docker then it always throws the below error when creating consumers

Caused by: o.a.k.c.c.ConfigException: Invalid value io.confluent.kafka.serializers.context.NullContextNameStrategy for configuration context.name.strategy: Class io.confluent.kafka.serializers.context.NullContextNameStrategy could not be found.\n"}

The confusing thing is that I can find and import that NullContextNameStrategy class, but that ConfigException still occurs :<

Here are my dependecies versions:

        <dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>3.6.0</version>
		</dependency>

		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-avro-serializer</artifactId>
			<version>7.6.0</version>
		</dependency>

		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-schema-serializer</artifactId>
			<version>7.6.0</version>
		</dependency>

Hello,
I could not figure out why the Kafka dependencies could not be found when I ran it in Docker. But I managed to find a workaround for the problem so that the Kafka Producer could be started successfully using the schema registry. Using the constructors will most likely solve the problem of the class not being found during runtime.

You will find below a snippet of my Kafka Config File:

    @Bean
    public SchemaRegistryClient schemaRegistryClient() {
        return new CachedSchemaRegistryClient("https://mySchemaRegistryUrl", 1000);
    }

    @Bean
    @Qualifier("json-schema")
    public KafkaTemplate<String, MyEvent<?>> jsonSchemaKafkaTemplate(SchemaRegistryClient schemaRegistryClient) throws ClassNotFoundException {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(setCommonProducerProperties(schemaRegistryClient)));
    }

    private Map<String, Object> setCommonProducerProperties(SchemaRegistryClient schemaRegistryClient) throws ClassNotFoundException {
        Map<String, Object> configProps = setBrokerProperties();
        // Configure KafkaJsonSchemaSerializer with SchemaRegistryClient
        KafkaJsonSchemaSerializer<MyEvent<?>> serializer = new KafkaJsonSchemaSerializer<>(schemaRegistryClient);
        serializer.configure(configProps, false);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Class.forName(KafkaJsonSchemaSerializer.class.getName()));
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Class.forName(StringSerializer.class.getName()));

        return configProps;
    }

    private Map<String, Object> setBrokerProperties() {
        Map<String, Object> brokerProperties = new HashMap<>();
        brokerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPropertiesConfiguration.getBroker().getBootstrapServers());
        brokerProperties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "https://mySchemaRegistryUrls");

        // Kafka Broker SSL properties
        brokerProperties.put("security.protocol", "SSL");
        brokerProperties.put("ssl.truststore.location", securityProperties.getSsl().getTrustStoreLocation());
        brokerProperties.put("ssl.truststore.password", securityProperties.getSsl().getTrustStorePassword());
        brokerProperties.put("ssl.keystore.location", securityProperties.getSsl().getKeyStoreLocation());
        brokerProperties.put("ssl.keystore.password", securityProperties.getSsl().getKeyStorePassword());
        brokerProperties.put("ssl.key.password", securityProperties.getSsl().getKeyStorePassword());
        brokerProperties.put("ssl.keystore.type", securityProperties.getSsl().getType());
        brokerProperties.put("ssl.truststore.type", securityProperties.getSsl().getType());
        
        // Schema registry SSL properties
        brokerProperties.put("schema.registry.security.protocol", "SSL");
        brokerProperties.put("schema.registry.ssl.truststore.location", securityProperties.getSsl().getTrustStoreLocation());
        brokerProperties.put("schema.registry.ssl.truststore.password", securityProperties.getSsl().getTrustStorePassword());
        brokerProperties.put("schema.registry.ssl.keystore.location", securityProperties.getSsl().getKeyStoreLocation());
        brokerProperties.put("schema.registry.ssl.keystore.password", securityProperties.getSsl().getKeyStorePassword());
        brokerProperties.put("schema.registry.ssl.key.password", securityProperties.getSsl().getKeyStorePassword());
        brokerProperties.put("schema.registry.ssl.keystore.type", securityProperties.getSsl().getType());
        brokerProperties.put("schema.registry.ssl.truststore.type", securityProperties.getSsl().getType())

        return brokerProperties;
    }
1 Like