Kafka streams with aws glue AVRO messages

I am really struggling to setup kafka configuration that will be able to use kafka topics that have AVRO encoded messages using schemas that are in AWS Glue.

I am using clojure and jackdaw library.

(def consumer (jc/subscribed-consumer {"bootstrap.servers" (:kafka-servers config/config)
                                         "consumer.auto.offset.reset" "latest"
                                         "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
                                         "value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
                                         "avroRecordType" "GENERIC_RECORD"
                                         "registry.name" "test123"
                                         "group.id" "test123"
                                         "max.poll.records" (.intValue 1)
                                         "schemaName" "test"}
                                 [{:topic-name "test"}]))
(.close consumer)



(jc/poll consumer 1000)

This setup works and i am able to receive messages but this is regular consumer i want to use kafka streams.

(defn build-topology [builder input-topics]
  (let [topic-maps (map topic-config input-topics)]
    (-> (jstreams/kstreams builder topic-maps)
       (jstreams/peek forward)))
  builder)



(mount/defstate kafka
  :start (let [builder (jstreams/streams-builder)
               topology (build-topology builder ["test123"])
               application-config {"bootstrap.servers" (:kafka-servers config/config)
                                   "consumer.auto.offset.reset" "latest"
                                   "application.id" (str (:profile (mount/args)) "-match-status-" (random-uuid))
                                   ;;"cache.max.bytes.buffering" 0
                                   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
                                   "value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
                                   "avroRecordType" "GENERIC_RECORD"
                                   "awsRegion" "eu-west-1"
                                   "registryName" "test123"
                                   "schemaName" "test123"
                                   }
               application (jstreams/kafka-streams
                            topology
                            application-config)]
           (timbre/info "Starting Kafka Streams application..." {:application-config application-config})
           (jstreams/start application)

           {:application application
            :application-config application-config
            :builder builder
            :topology topology})

  :stop (do (jstreams/close (:application kafka))
            {}))

But nothing happens.

I am leaning toward

This is not clojure specific or jackdaw specific question but more of a kafka question.
I am lacking skill or knowledge on why this isn’t working.

I also tried adopting GitHub - awslabs/aws-glue-schema-registry: AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started. but same behavior.

Looking for guidance

thank for you for your time

Sorry. I can hardly read this code… So no totally sure what you are trying to do. – I don’t know what jackdaw is either.

But nothing happens.

If KafkaStreams#start() is called, it will spin up background threads to subscribe to the input topics, start processing, and to write to result topics. I see some peek call, but not sure what forward would do…

Kafka Streams also output logs. Did you inspect them to see what it’s doing?

Hey thanks for answering!

Sadly jackdaw (abstraction over kafka streams) doesn’t show logs by default - i will look into it and if output/stack trace can be found by enabling some options.

forward just prints out message to standard out in order to exclude possible issues in peek causing issues.

My intuition leads me that this is SerDe problem, I am not sure how i can setup kafka stream to know to pick up schema from AWS Glue, use proper schema.

Is there a Java working example that i could use that uses AWS Glue, AVRO and Kafka Streams?
I failed to find one that fits my use case (either it’s confluence avro or regular consumer)

Sadly jackdaw (abstraction over kafka streams) doesn’t show logs by default

Well, you say “by default”. Can’t you change the default? Inspecting the logs is the easiest way to find the root cause.

My intuition leads me that this is SerDe problem

By default, KS would crash if a serde throws an exception. You can also plugin a custom DeserializationExceptionHandler via the configs if this helps.

Is there a Java working example that i could use that uses AWS Glue, AVRO and Kafka Streams?
I failed to find one that fits my use case (either it’s confluence avro or regular consumer)

I don’t know of any Glue example – well, this is the Confluent forum, and I work for Confluent. We don’t advertise Glue but our own SR :slight_smile: