Python / Avro Serialised payload onto Topic

Hi all

Bit stuck and not sure if I’m doing something wrong or simply misunderstanding…

Previously wrote a Golang producer which serialised (with schema registry) a payload.
I was able to still view the payload in Control Center/Topic/messages.

Wrote one no in Python and I"m just garble gook… :wink: on my topic.
I’ve simplified my code from the bigger app into the attached.

Strangely enough the messages are making it onto the topic with schema enabled…

It starts with runX.sh

#!/bin/bash

export RECORDS=100000
export FLUSHER=1000

export MODE=1
export SITES_MIN=101
export SITES_MAX=106

export DEVICE_MIN=1001
export DEVICE_MAX=1100

export SENSOR_MIN=10001
export SENSOR_MAX=11001

export MEASUREMENT_MIN=980.5
export MEASUREMENT_MAX=1055.7

python3 main.py

main.py

import random, time, os, socket, json
from datetime import datetime

from confluent_kafka import avro, Producer
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer


def generate_random_location(base_lat, base_long, lat_range=0.01, long_range=0.01):
    
    random_lat  = base_lat + random.uniform(-lat_range, lat_range)
    random_long = base_long + random.uniform(-long_range, long_range)
    
    return {"latitude": random_lat, "longitude": random_long}

#end 


def serializer(config):

    # Initialize Schema Registry Client
    schema_registry_client = CachedSchemaRegistryClient(config["schema.registry.url"])
        
    key_schema_str = """
        {
            "type": "int"
        }
        """
        
    # Read Schema file
    path   = os.path.realpath(os.path.dirname(__file__))
    with open(f"{path}/{config["schema.file"] }") as f:
        value_schema_str = f.read()
        
    # Serializer
    serializer = MessageSerializer(registry_client     = schema_registry_client,
                                   reader_key_schema   = key_schema_str,
                                   reader_value_schema = value_schema_str)
        
    return serializer, key_schema_str, value_schema_str
#end


def createKafkaProducer(config):


    conf = {
        "bootstrap.servers":   config["bootstrap.servers"],
        'client.id':           socket.gethostname()
    }

    # Create a standard Kafka producer (for raw key publishing)    
    return Producer(conf)
    
#end


def createPayload(config):
    
    # Get current time in milliseconds
    current_time_millis = int(time.time() * 1000)

    # Convert milliseconds to a datetime object
    dt_object = datetime.fromtimestamp(current_time_millis / 1000)  # Convert to seconds
    
    if config["mode"] > 0 :
        # Convert to string in ISO format
        ts_human = dt_object.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"  # Trim to milliseconds
        # get a gps location
        location = generate_random_location(config["base_latitude"], config["base_longitude"])
        
    else:
        ts_human = None
        location = None

    if config["mode"] > 1:
        deviceType =  "Pressure"  # Nullable

    else:
        deviceType = None  # Nullable

        
    # Generate a random key integer
    siteId  = random.randint(config["sites_min"], config["sites_max"])
    
    payload = {
        "ts": current_time_millis,
        "metadata": {
            "siteId":     siteId,
            "deviceId":   random.randint(config["device_min"], config["device_max"]),
            "sensorId":   random.randint(config["sensor_min"], config["sensor_max"]),
            "unit":       "Psi",
            "ts_human":   ts_human,    # Nullable
            "location":   location,    # Nullable
            "deviceType": deviceType   # Nullable
        },
        "measurement": random.uniform(config["measurement_min"], config["measurement_max"])
    }

    return payload, siteId

#end


class Payload(object):
    
    """
    Payload record

        ts (integer): 
        metadata (dict): 
            siteId (integer)
            deviceId (integer)
            sensorId (integer)
            unit (string)
            ts_human (string)
            location (dict)
                Longitude (integer)
                Latitude (integer)
            deviceType (string)
        measurement (double)
        
    """

    def __init__(self, ts, metadata, measurement):
        self.ts                 = ts
        self.metadata           = metadata
        self.measurement        = measurement

# end


class Location(object):
    
    def __init__(self, longitude, latitude):
        self.longitude  = longitude
        self.latitude   = latitude

    def to_dict(self):
        return {
            "longitude":    self.longitude
           ,"latitude":     self.latitude
        }
# end


class Metadata(object):
    
    def __init__(self, siteId, deviceId, sensorId, unit, ts_human=None, location=None, deviceType=None):
        self.siteId     = siteId
        self.deviceId   = deviceId
        self.sensorId   = sensorId
        self.unit       = unit
        self.ts_human   = ts_human if ts_human else None
        self.location   = location if location else None
        self.deviceType = deviceType if deviceType else None

    def to_dict(self):
        
        # Compulsory fields
        data = {
            "siteId":   self.siteId,
            "deviceId": self.deviceId,
            "sensorId": self.sensorId,
            "unit":     self.unit,
        }

        # Optional Fields
        # Conditionally add `ts_human` => Human readable timestamp
        if self.ts_human:
            data["ts_human"] = self.ts_human

        # Conditionally add `location` if they exist
        if self.location:
            data["location"] = self.location.to_dict()
            
        # Conditionally add `deviceType` if they exist
        if self.deviceType:
            data["deviceType"] = self.deviceType

        return data
# end


def main(config):
    
    n       = 0
    total   = 0

    avro_serializer, key_schema_str, value_schema_str = serializer(config)

    producer = createKafkaProducer(config)

    # Example payload
    for i in range(config["records"]):

        payload, siteId = createPayload(config)
        
        key_serialized = avro_serializer.encode_record_with_schema(
            topic  = config["topic"], 
            schema = avro.loads(key_schema_str), 
            record = siteId,
            is_key = True
        )
        
        value_serialized = avro_serializer.encode_record_with_schema(
            topic  = config["topic"], 
            schema = avro.loads(value_schema_str), 
            record = dict(
                    ts          = payload["ts"],
                    metadata    = dict(payload["metadata"]),  # Convert Metadata object to dictionary
                    measurement = payload["measurement"]
            )     
        )
                
        producer.produce(
            topic = config["topic"], 
            key   = key_serialized, 
            value = value_serialized
        )
        
        n     = n + 1
        total = total + 1
        
        if n % config["flusher"] == 0:

            print(f"Messages sent to Kafka topic: {config["topic"]}, Records: {total}")
            n = 0
            producer.flush()  # Ensure message is sent

            #print(f"Messages sent to Kafka with key: {key} val: {value}")
        #end
# end main


if __name__ == "__main__":

    # if 0 then we just want to continue running, so setting it to crazy large, 
    # it will end eventually...
    records = int(os.environ["RECORDS"])
    if records == 0:
        records = 1000000000000000000000000
        
        
    # Kafka and Schema Registry configuration
    config = {
        "bootstrap.servers":   "localhost:9092",         # Replace with actual broker
        "schema.registry.url": "http://localhost:9081",  # Replace with actual schema registry
        "topic":               "factory_iot",
        "schema.file":         "factory_iot.avsc",
        "base_latitude":        -26.195246,
        "base_longitude":       28.034088,
        "records":              records,
        "flusher":              int(os.environ["FLUSHER"]),
        "mode":                 int(os.environ["MODE"]),
        "sites_min":            int(os.environ["SITES_MIN"]),
        "sites_max":            int(os.environ["SITES_MAX"]),
        "device_min":           int(os.environ["DEVICE_MIN"]),
        "device_max":           int(os.environ["DEVICE_MAX"]),
        "sensor_min":           int(os.environ["SENSOR_MIN"]),
        "sensor_max":           int(os.environ["SENSOR_MAX"]),
        "measurement_min":      float(os.environ["MEASUREMENT_MIN"]),
        "measurement_max":      float(os.environ["MEASUREMENT_MAX"])
    }
    
    main(config)
    
# THE END ... ;)

creTopics.sh

#!/bin/bash

export COMPOSE_PROJECT_NAME=devlab

docker  exec broker kafka-topics \
 --create -topic factory_iot \
 --bootstrap-server localhost:9092 \
 --partitions 1 \
 --replication-factor 1

# Lets list topics, excluding the default Confluent Platform topics
docker  exec broker kafka-topics \
 --bootstrap-server localhost:9092 \
 --list | grep -v '_confluent' |grep -v '__' |grep -v '_schemas' | grep -v 'default' | grep -v 'docker-connect'

 ./factory_iot.sh

factory_iot.sh

#!/bin/bash

# Register Schema's on local topics
schema=$(cat ./factory_iot.avsc | sed 's/\"/\\\"/g' | tr -d "\n\r")
SCHEMA="{\"schema\": \"$schema\", \"schemaType\": \"AVRO\"}"
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data "$SCHEMA" \
  http://localhost:9081/subjects/factory_iot-value/versions

The Schema File / factory_iot.avsc

{
    "type": "record",
    "name": "SensorReading",
    "fields": [
        {"name": "ts", "type": "int"},
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "Metadata",
                "fields": [
                    {"name": "siteId", "type": "int"},
                    {"name": "deviceId", "type": "int"},
                    {"name": "sensorId", "type": "int"},
                    {"name": "unit", "type": "string"},
                    {"name": "ts_human", "type": ["null", "string"], "default": null},
                    {
                        "name": "location",
                        "type": [
                            "null",
                            {
                                "type": "record",
                                "name": "Location",
                                "fields": [
                                    {"name": "latitude", "type": "double"},
                                    {"name": "longitude", "type": "double"}
                                ]
                            }
                        ],
                        "default": null
                    },
                    {"name": "deviceType", "type": ["null", "string"], "default": null}
                ]
            }
        },
        {"name": "measurement", "type": "double"}
    ]
}

See attached diagram for example of garble.

thunking it’s my schema def…

{
    "name": "factory_iot",
    "namespace": "factory",
    "doc": "Factory Telemetry/IoT measurements",
    "type": "record",
    "fields": [
        {"name": "ts", "type": "int"},
        {"name": "metadata", "type": [
                {"type": "record",
                 "name": "metadata",
                 "fields": [
                    {"name": "siteId","type": "int"},
                    {"name": "deviceId","type": "int"},
                    {"name": "sensorId","type": "int"},
                    {"name": "unit","type": "string"},
                    {"name": "ts_human","type": ["null", "string"], "default": null},
                    {"name": "location", "type": ["null",
                        {"name": "location","type": "record",
                            "fields": [
                                {"name": "latitude", "type": "double"},
                                {"name": "longitude", "type": "double"}
                            ]
                        }
                    ], "default": null},
                    {"name": "deviceType", "type": ["null","string"], "default": null}
                 ]
                }
            ]
        },
        {"name": "measurement", "type": "double"}
    ]
}

and payload, note, ts_human, location and deviceType can be omitted.

{
    "timestamp" : "2024-10-02T00:00:00.869Z",
    "metadata" : {
        "siteId" : 1009,
        "deviceId" : 1042,
        "sensorId" : 10180,
        "unit" : "Psi",
        "ts_human" : "2024-10-02T00:00:00.869Z",
        "location": {
            "latitude": -26.195246, 
            "longitude": 28.034088
        },
        "deviceType" : "Oil Pump"
    },
    "measurement" : 1013.3997
}```

problem solved…

I defined my avro schema with a field called “ts” as per above, of type int.
Well int is not able to store the time value in milliseconds from point 0.

Discovered this by using kcat to try and consume the messages and got a proper error that hinted to the problem.

G

New Schema

{
    "name": "factory_iot",
    "namespace": "factory",
    "doc": "Factory Telemetry/IoT measurements",
    "type": "record",
    "fields": [
        {"name": "ts", "type": "long"},
        {"name": "metadata", "type": [
                {"type": "record",
                 "name": "metadata",
                 "fields": [
                    {"name": "siteId","type": "int"},
                    {"name": "deviceId","type": "int"},
                    {"name": "sensorId","type": "int"},
                    {"name": "unit","type": "string"},
                    {"name": "ts_human","type": ["null", "string"], "default": null},
                    {"name": "location", "type": ["null",
                        {"name": "location","type": "record",
                            "fields": [
                                {"name": "latitude", "type": "double"},
                                {"name": "longitude", "type": "double"}
                            ]
                        }
                    ], "default": null},
                    {"name": "deviceType", "type": ["null", "string"], "default": null}
                    ]
                }
            ]
        },
        {"name": "measurement", "type": "double"}
    ]
}
1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.