Confluent azure event hub source connector show bytes in value schema

HI there:

We are now using confluent 6.1.0 and azure-event-hub 1.2.1 , connect azure event hub and confluent kafka. It works fine, we can receive messages from event-hub but the value schema is always byte which make us really confused. Can you pls help us with that. Thanks a lot

the Json body:

{
  "name": "eventhub_to_kafka_pullt12",
  "config": {
    "confluent. topic. bootstrap. servers": "broker:29092",
    "connector .class": "io.confluent.connect.azure.eventhubs.EventHubsSourceConnector",
    "kafka.topic": "evenhubt5",
    "tasks.max": "1",
    "max.events": "10",
    "azure.eventhubs.sas.keyname": "xxx",
    "azure.eventhubs.sas.key": "xxx",
    "azure.eventhubs.namespace": "xxx",
    "azure.eventhubs.hub.name": "xxx",
    "offsets.topic.replication.factor": "1",
    "confluent.license.topic.replication.factor": "1",
    "transaction.state.log.replication.factor": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schema.registry.url": "http://xxxx:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "transaction.state.log.replication.factor": "1"
  }
}

the error:

The docs don’t seem to elaborate a great deal here. I’ve not worked with EventHubs but in integrations from similar upstream sources (messaging systems) e.g. ActiveMQ and IBM MQ I’ve used the org.apache.kafka.connect.converters.ByteArrayConverter, which might be what you need to do here.

The nuance is that the payload from a messaging system doesn’t necessarily have a schema as interpreted by the converter - it might just take it as a lump of data, a chunk of bytes. Once you’ve got those bytes you can infer / apply the schema that you know exists.

Here’s a couple of examples to check:

It is pointless to set the value io.confluent.connect.avro.AvroConverter to the property value.converter if the producer of the records doesn’t store the schema along with the record during transmission. What Kafka receives will always be bytes since there is no tangible way for the source connector (which, remember, acts kind of like a Kafka consumer for the source system) to retrieve the schema from the record and perform deserialization.

Without knowing much about this connector (its source-code is private and not available publicly on GitHub) I would say that it needs to have a schema in order to be able to invoke the toConnectData() method from the Converter implementation.

@riferrei

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.