Problem with decode a Rabbit message

Hi I’m using Confluent kafka with Python and i’m testing the rabbitMqtt connector. A data producer write on a Rabbit queue and with the connector it writes data on a kafka topic. When a consumer must be read the kafka topic I receive the error:

Traceback (most recent call last):   File "./temp_consumer.py", line 84, in <module>     data = json.loads(record_value)   File "/usr/lib/python3.8/json/_init_.py", line 343, in loads     s = s.decode(detect_encoding(s), 'surrogatepass')   File "/usr/lib/python3.8/encodings/utf_32_be.py", line 11, in decode     return codecs.utf_32_be_decode(input, errors, True) UnicodeDecodeError: 'utf-32-be' codec can't decode bytes in position 4-7: code point not in range(0x110000)

The error seems to be generated from a json.decode:

record_value = json.dump(temp_data)

the record value before the decode is:

b'\x00\x00\x00\x00\x02{"id": 15, "body": "4.8", "timestamp": "2021-03-21 01:00:00"}' 

Can someone help me?
Thans.

It looks like you may be using the io.confluent.connect.json.JsonSchemaConverter rather than org.apache.kafka.connect.json.JsonConverter. The difference being that the former stores the schema in the Schema Registry and puts some bytes on the front of the message with details of that schema.

So either use the JSON Schema deserialiser, or use org.apache.kafka.connect.json.JsonConverter in your connector. The disadvantage of the latter is that you lose the schema being stored in the Schema Registry which is very useful to have for anyone else who wants to consume the data.

Ref: Why JSON isn’t the same as JSON Schema in Kafka Connect converters and ksqlDB (Viewing Kafka messages bytes as hex)

Hi @rmoff thanks for your reply!
Can you suggest how to use org.apache.kafka.connect.json.JsonConverter or JSON Schema deserialiser? There is some tutorial with python?
Thanks.

This is a converter that you would specify this in your Kafka Connect source configuration.

I would start with the docs and sample code.

Hi @rmoff,

thanks for your patience but your suggestions can’t solve my problem.
I try to share my code:

In the first part there is my rabbit mqtt producer, ahed there is my Kafka consumer.
I suppose to query an API to retrieve temperature data, this are sendend to a Rabbit queue, so I want to kafka read from this queue and store messages in a kafka topic.
In a terminal I launch the pika producer and after the end I launch the Kafka consumer…but I receive the error:

b'\x00\x00\x00\x00\x02{"id": 3, "body": "19.3", "timestamp": "2021-03-28 13:00:00"}'
Traceback (most recent call last):
  File "./temp_consumer.py", line 89, in <module>
    data = json.loads(record_value)
  File "/usr/lib/python3.8/json/__init__.py", line 343, in loads
    s = s.decode(detect_encoding(s), 'surrogatepass')
  File "/usr/lib/python3.8/encodings/utf_32_be.py", line 11, in decode
    return codecs.utf_32_be_decode(input, errors, True)
UnicodeDecodeError: 'utf-32-be' codec can't decode bytes in position 4-7: code point not in range(0x110000)

This is an example message sendend to Rabbit:

"{\"id\": 23, \"body\": \"10.8\", \"timestamp\": \"2021-03-29 09:00:00\"}"

I try in confluent center to define the schema in this manner:

{
  "$schema": "draft-07/schema#",
  "$id": "myURI.schema.json",
  "title": "value_temperature_min",
  "description": "Sample of temp min.",
  "type": "object",
  "additionalProperties": false,
  "properties": {
    "id": {
      "type": "integer",
      "description": "id"
    },
    "body": {
      "type": "float",
      "description": "Temp min value"
    },
    "timestamp": {
      "type": "datetime",
      "description": "date time of temp min."
    }
  }
}

but I receive a validation error: unknown type: “object”.

Can give me some tip?
Thanks.

Can you clarify why you’re using both RabbitMQ and Kafka? If it’s your own application writing to RabbitMQ, why not just write to Kafka instead? or just consume from RabbitMQ instead?

Looking at your Python code I think you’re still trying to handle the JSONSchema payload as raw JSON, when it isn’t

data = json.loads(record_value)

I would start with the example here and build from there.