I am really struggling to setup kafka configuration that will be able to use kafka topics that have AVRO encoded messages using schemas that are in AWS Glue.
I am using clojure and jackdaw library.
(def consumer (jc/subscribed-consumer {"bootstrap.servers" (:kafka-servers config/config)
"consumer.auto.offset.reset" "latest"
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
"avroRecordType" "GENERIC_RECORD"
"registry.name" "test123"
"group.id" "test123"
"max.poll.records" (.intValue 1)
"schemaName" "test"}
[{:topic-name "test"}]))
(.close consumer)
(jc/poll consumer 1000)
This setup works and i am able to receive messages but this is regular consumer i want to use kafka streams.
(defn build-topology [builder input-topics]
(let [topic-maps (map topic-config input-topics)]
(-> (jstreams/kstreams builder topic-maps)
(jstreams/peek forward)))
builder)
(mount/defstate kafka
:start (let [builder (jstreams/streams-builder)
topology (build-topology builder ["test123"])
application-config {"bootstrap.servers" (:kafka-servers config/config)
"consumer.auto.offset.reset" "latest"
"application.id" (str (:profile (mount/args)) "-match-status-" (random-uuid))
;;"cache.max.bytes.buffering" 0
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
"avroRecordType" "GENERIC_RECORD"
"awsRegion" "eu-west-1"
"registryName" "test123"
"schemaName" "test123"
}
application (jstreams/kafka-streams
topology
application-config)]
(timbre/info "Starting Kafka Streams application..." {:application-config application-config})
(jstreams/start application)
{:application application
:application-config application-config
:builder builder
:topology topology})
:stop (do (jstreams/close (:application kafka))
{}))
But nothing happens.
I am leaning toward
This is not clojure specific or jackdaw specific question but more of a kafka question.
I am lacking skill or knowledge on why this isn’t working.
I also tried adopting GitHub - awslabs/aws-glue-schema-registry: AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started. but same behavior.
Looking for guidance
thank for you for your time