Hello. I have an MQTT producer that sends the message payload as a byte array to the MQTT source connector. Is there a way for the connector to transform the byte array message to actual json so that the messages follow the topic schema?
I have this configuration for the connector:
{
“name”: “MqttSourceConnectorTemp”,
“connector.class”: “io.confluent.connect.mqtt.MqttSourceConnector”,
“mqtt.qos”: “0”,
“confluent.topic.bootstrap.servers”: “kafka:9092”,
“tasks.max”: “1”,
“mqtt.clean.session.enabled”: “true”,
“transaction.boundary”: “poll”,
“mqtt.server.uri”: “tcp://mqtt-broker:1883”,
“header.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“key.converter.schemas.enable”: “false”,
“exactly.once.support”: “requested”,
“confluent.topic.replication.factor”: “1”,
“value.converter.schemas.enable”: “false”,
“mqtt.topics”: “mqtt/temp/#”,
“kafka.topic”: “temp”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”
}
Can you explain the serialization process? How does the byte array get created?
On the Kafka side, are you using Schema Registry and JSON Schema-formatted values? Or just JSON strings?
I have a java producer that creates a json message and sends it to the MQTT broker and then through the MQTT source connector. The MqttMessage method of the Eclipse Paho Client library requires a byte payload so i set “value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”. What can I do to convert the byte array to proper json? I use ksqldb and kafka streams later in the pipeline that use json_sr as the value format but i want the input topic to be proper json as well to be able to write the messages to a database using the JDBC sink connector. As it is now I can’t use the JDBC sink connector with a byte array to map the json fields to the table columns
Random randomOffset = new Random();
Double randomTemperature = randomOffset.nextDouble(86,88);
JsonObject json = new JsonObject();
json.addProperty(“message_id”, UUID.randomUUID().toString());
json.addProperty(“temperature”, randomTemperature);
json.addProperty(“timestamp”, ZonedDateTime.now(ZoneOffset.UTC).format(formatter).toString());
// Convert JSON to String and set as payload
content = json.toString();
MqttMessage message = new MqttMessage(content.getBytes(StandardCharsets.UTF_8));
message.setQos(0);
client.publish(topic, message);
A few ideas come to mind:
- Check out this
BytesToString
SMT (GitHub) that can be added to the connector config to get you back to the string in your MQTT client code. You’d useorg.apache.kafka.connect.storage.StringConverter
instead. This wouldn’t give you JSON Schema but would get you to something closer / potentially easier to play with. - Generate the proper JSON Schema-formatted message on the MQTT client side. Here is the wire format doc. So you would write a zero byte, then 4 bytes for schema ID, then the result of KafkaJsonSchemaSerializer.serialize(). Keep using
ByteArrayConverter
in the connector config. - Land the data as is in a byte array and then write a stream processing app to convert. E.g., here is a Kafka Streams app to convert from Avro to Protobuf that could be a starting point for converting from UTF-8 encoded string to JSON Schema format. Obviously adds overhead given the topic-to-topic transformation compared to option 2
I would give option 2 a try first. It feels a bit hacky and I could be missing something that would prevent it from working, but the efficiency plus getting into a Schema Regsitry format from the get go makes it worthwhile IMO (assuming it works )
I try to simulate a real IoT scenario so I think idea 1 is the best solution!? Would I be able to use the JDBC sink connector and dump the json string messages in a single/multiple column table? If I set a schema to the topic that has json string messages would that be sufficient?
Instead of #1 there is a much better FromJson
SMT here (GitHub) for what you want to do. The Confluent Platform cp-demo
example also uses it here.
You can use it on the source or the sink.
To use on the source, since the record flow is SMT → converter, you would use this SMT and then one of the Schema Registry converters as the CP demo example does.
To use on the sink, since the record flow is converter → SMT, assuming you used ByteArrayConverter
in the MqttSourceConnector
, you would also use ByteArrayConverter
on the sink. Here is an example that uses StringConverter
followed by this SMT in a JdbcSinkConnector
.