I created a kafka
service in docker which internally uses three images. Below are the images
confluentinc/cp-zookeeper:latest
confluentinc/cp-kafka:latest
confluentinc/cp-schema-registry:latest
My docker-compose.yml file looks like below
---
version: '3.8'
networks:
learning:
name: learning
driver: bridge
external: true
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
networks:
- learning
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
tmpfs:
- "/datalog"
broker:
image: confluentinc/cp-kafka:latest
container_name: broker
hostname: broker
networks:
- learning
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://host.docker.internal:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_NUM_PARTITIONS: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
container_name: schema-registry
hostname: schema-registry
networks:
- learning
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://host.docker.internal:9092
kafka:
producer:
bootstrap-servers: host.docker.internal:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
client-id: proto-topics
compression-type: snappy
properties:
schema:
registry:
url: host.docker.internal:8081
I have created an application that uses kafka
service for messaging and uses io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
for value-serializer. Below are the configurations in application.yml
in local, docker environments of spring-boot project where local.yml
implies a producer application on local machine connecting to kafka service running in docker and docker.yml
implies a producer application on docker container connecting to kafka service running in another container on docker (connected using an external network-bridge).
local.yml
kafka:
producer:
bootstrap-servers: host.docker.internal:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
client-id: learning
compression-type: snappy
properties:
schema:
registry:
url: http://localhost:8081
docker.yml
kafka:
producer:
bootstrap-servers: host.docker.internal:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
client-id: learning
compression-type: snappy
properties:
schema:
registry:
url: host.docker.internal:8081
When application runs from local (using local.yml
properties), am able to successfully produce messages to kafka, however when I do same from docker container (using docker.yml
properties), I am getting below exception. And am unable to understand what change I should do to fix this issue
learning | org.apache.kafka.common.KafkaException: Failed to construct kafka producer
learning | at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:439)
learning | at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:290)
learning | at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:843)
learning | at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:694)
learning | at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:664)
learning | at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:639)
learning | at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:633)
learning | at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:754)
learning | at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:638)
learning | at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:409)
learning | at learning.services.KafkaProducerService.sendMessage(KafkaProducerService.java:23)
learning | at learning.services.LearningProcessor.lambda$updateLearnings$3(LearningProcessor.java:89)
learning | at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714)
learning | at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
learning | at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
learning | at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)
learning | at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
learning | at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
learning | at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
learning | at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
learning | at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
learning | Caused by: java.lang.ExceptionInInitializerError
learning | at io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.configure(KafkaProtobufSerializer.java:68)
learning | at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:375)
learning | ... 20 more
learning | Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.context.NullContextNameStrategy for configuration context.name.strategy: Class io.confluent.kafka.serializers.context.NullContextNameStrategy could not be found.
learning | at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
learning | at org.apache.kafka.common.config.ConfigDef$ConfigKey.<init>(ConfigDef.java:1146)
learning | at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:152)
learning | at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:172)
learning | at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:211)
learning | at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:373)
learning | at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:386)
learning | at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.baseConfigDef(AbstractKafkaSchemaSerDeConfig.java:277)
learning | at io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializerConfig.<clinit>(KafkaProtobufSerializerConfig.java:47)
learning | ... 22 more
Below are the dependencies in maven pom.xml
<protobuf-java.version>3.21.12</protobuf-java.version>
<protobuf-java-format.version>1.4</protobuf-java-format.version>
<kafka-protobuf-serializer-version>7.3.1</kafka-protobuf-serializer-version>
Can you please advice?