MQTT source connector byte[] array transform to JSON

Hello. I have an MQTT producer that sends the message payload as a byte array to the MQTT source connector. Is there a way for the connector to transform the byte array message to actual json so that the messages follow the topic schema?
I have this configuration for the connector:
{
“name”: “MqttSourceConnectorTemp”,
“connector.class”: “io.confluent.connect.mqtt.MqttSourceConnector”,
“mqtt.qos”: “0”,
“confluent.topic.bootstrap.servers”: “kafka:9092”,
“tasks.max”: “1”,
“mqtt.clean.session.enabled”: “true”,
“transaction.boundary”: “poll”,
“mqtt.server.uri”: “tcp://mqtt-broker:1883”,
“header.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”: “false”,
“exactly.once.support”: “requested”,
“confluent.topic.replication.factor”: “1”,
“value.converter.schemas.enable”: “false”,
“mqtt.topics”: “mqtt/temp/#”,
“kafka.topic”: “temp”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”
}

Can you explain the serialization process? How does the byte array get created?

On the Kafka side, are you using Schema Registry and JSON Schema-formatted values? Or just JSON strings?

I have a java producer that creates a json message and sends it to the MQTT broker and then through the MQTT source connector. The MqttMessage method of the Eclipse Paho Client library requires a byte payload so i set “value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”. What can I do to convert the byte array to proper json? I use ksqldb and kafka streams later in the pipeline that use json_sr as the value format but i want the input topic to be proper json as well to be able to write the messages to a database using the JDBC sink connector. As it is now I can’t use the JDBC sink connector with a byte array to map the json fields to the table columns

Random randomOffset = new Random();
Double randomTemperature = randomOffset.nextDouble(86,88);
JsonObject json = new JsonObject();
json.addProperty(“message_id”, UUID.randomUUID().toString());
json.addProperty(“temperature”, randomTemperature);
json.addProperty(“timestamp”, ZonedDateTime.now(ZoneOffset.UTC).format(formatter).toString());

// Convert JSON to String and set as payload
content = json.toString();
MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));
message.setQos(0);
client.publish(topic, message);

A few ideas come to mind:

  1. Check out this BytesToString SMT (GitHub) that can be added to the connector config to get you back to the string in your MQTT client code. You’d use org.apache.kafka.connect.storage.StringConverter instead. This wouldn’t give you JSON Schema but would get you to something closer / potentially easier to play with.
  2. Generate the proper JSON Schema-formatted message on the MQTT client side. Here is the wire format doc. So you would write a zero byte, then 4 bytes for schema ID, then the result of KafkaJsonSchemaSerializer.serialize(). Keep using ByteArrayConverter in the connector config.
  3. Land the data as is in a byte array and then write a stream processing app to convert. E.g., here is a Kafka Streams app to convert from Avro to Protobuf that could be a starting point for converting from UTF-8 encoded string to JSON Schema format. Obviously adds overhead given the topic-to-topic transformation compared to option 2

I would give option 2 a try first. It feels a bit hacky and I could be missing something that would prevent it from working, but the efficiency plus getting into a Schema Regsitry format from the get go makes it worthwhile IMO (assuming it works :slight_smile: )

1 Like

I try to simulate a real IoT scenario so I think idea 1 is the best solution!? Would I be able to use the JDBC sink connector and dump the json string messages in a single/multiple column table? If I set a schema to the topic that has json string messages would that be sufficient?

Instead of #1 there is a much better FromJson SMT here (GitHub) for what you want to do. The Confluent Platform cp-demo example also uses it here.

You can use it on the source or the sink.

To use on the source, since the record flow is SMT → converter, you would use this SMT and then one of the Schema Registry converters as the CP demo example does.

To use on the sink, since the record flow is converter → SMT, assuming you used ByteArrayConverter in the MqttSourceConnector, you would also use ByteArrayConverter on the sink. Here is an example that uses StringConverter followed by this SMT in a JdbcSinkConnector.

1 Like