Hi all
Bit stuck and not sure if I’m doing something wrong or simply misunderstanding…
Previously wrote a Golang producer which serialised (with schema registry) a payload.
I was able to still view the payload in Control Center/Topic/messages.
Wrote one no in Python and I"m just garble gook… on my topic.
I’ve simplified my code from the bigger app into the attached.
Strangely enough the messages are making it onto the topic with schema enabled…
It starts with runX.sh
#!/bin/bash
export RECORDS=100000
export FLUSHER=1000
export MODE=1
export SITES_MIN=101
export SITES_MAX=106
export DEVICE_MIN=1001
export DEVICE_MAX=1100
export SENSOR_MIN=10001
export SENSOR_MAX=11001
export MEASUREMENT_MIN=980.5
export MEASUREMENT_MAX=1055.7
python3 main.py
main.py
import random, time, os, socket, json
from datetime import datetime
from confluent_kafka import avro, Producer
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer
def generate_random_location(base_lat, base_long, lat_range=0.01, long_range=0.01):
random_lat = base_lat + random.uniform(-lat_range, lat_range)
random_long = base_long + random.uniform(-long_range, long_range)
return {"latitude": random_lat, "longitude": random_long}
#end
def serializer(config):
# Initialize Schema Registry Client
schema_registry_client = CachedSchemaRegistryClient(config["schema.registry.url"])
key_schema_str = """
{
"type": "int"
}
"""
# Read Schema file
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/{config["schema.file"] }") as f:
value_schema_str = f.read()
# Serializer
serializer = MessageSerializer(registry_client = schema_registry_client,
reader_key_schema = key_schema_str,
reader_value_schema = value_schema_str)
return serializer, key_schema_str, value_schema_str
#end
def createKafkaProducer(config):
conf = {
"bootstrap.servers": config["bootstrap.servers"],
'client.id': socket.gethostname()
}
# Create a standard Kafka producer (for raw key publishing)
return Producer(conf)
#end
def createPayload(config):
# Get current time in milliseconds
current_time_millis = int(time.time() * 1000)
# Convert milliseconds to a datetime object
dt_object = datetime.fromtimestamp(current_time_millis / 1000) # Convert to seconds
if config["mode"] > 0 :
# Convert to string in ISO format
ts_human = dt_object.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" # Trim to milliseconds
# get a gps location
location = generate_random_location(config["base_latitude"], config["base_longitude"])
else:
ts_human = None
location = None
if config["mode"] > 1:
deviceType = "Pressure" # Nullable
else:
deviceType = None # Nullable
# Generate a random key integer
siteId = random.randint(config["sites_min"], config["sites_max"])
payload = {
"ts": current_time_millis,
"metadata": {
"siteId": siteId,
"deviceId": random.randint(config["device_min"], config["device_max"]),
"sensorId": random.randint(config["sensor_min"], config["sensor_max"]),
"unit": "Psi",
"ts_human": ts_human, # Nullable
"location": location, # Nullable
"deviceType": deviceType # Nullable
},
"measurement": random.uniform(config["measurement_min"], config["measurement_max"])
}
return payload, siteId
#end
class Payload(object):
"""
Payload record
ts (integer):
metadata (dict):
siteId (integer)
deviceId (integer)
sensorId (integer)
unit (string)
ts_human (string)
location (dict)
Longitude (integer)
Latitude (integer)
deviceType (string)
measurement (double)
"""
def __init__(self, ts, metadata, measurement):
self.ts = ts
self.metadata = metadata
self.measurement = measurement
# end
class Location(object):
def __init__(self, longitude, latitude):
self.longitude = longitude
self.latitude = latitude
def to_dict(self):
return {
"longitude": self.longitude
,"latitude": self.latitude
}
# end
class Metadata(object):
def __init__(self, siteId, deviceId, sensorId, unit, ts_human=None, location=None, deviceType=None):
self.siteId = siteId
self.deviceId = deviceId
self.sensorId = sensorId
self.unit = unit
self.ts_human = ts_human if ts_human else None
self.location = location if location else None
self.deviceType = deviceType if deviceType else None
def to_dict(self):
# Compulsory fields
data = {
"siteId": self.siteId,
"deviceId": self.deviceId,
"sensorId": self.sensorId,
"unit": self.unit,
}
# Optional Fields
# Conditionally add `ts_human` => Human readable timestamp
if self.ts_human:
data["ts_human"] = self.ts_human
# Conditionally add `location` if they exist
if self.location:
data["location"] = self.location.to_dict()
# Conditionally add `deviceType` if they exist
if self.deviceType:
data["deviceType"] = self.deviceType
return data
# end
def main(config):
n = 0
total = 0
avro_serializer, key_schema_str, value_schema_str = serializer(config)
producer = createKafkaProducer(config)
# Example payload
for i in range(config["records"]):
payload, siteId = createPayload(config)
key_serialized = avro_serializer.encode_record_with_schema(
topic = config["topic"],
schema = avro.loads(key_schema_str),
record = siteId,
is_key = True
)
value_serialized = avro_serializer.encode_record_with_schema(
topic = config["topic"],
schema = avro.loads(value_schema_str),
record = dict(
ts = payload["ts"],
metadata = dict(payload["metadata"]), # Convert Metadata object to dictionary
measurement = payload["measurement"]
)
)
producer.produce(
topic = config["topic"],
key = key_serialized,
value = value_serialized
)
n = n + 1
total = total + 1
if n % config["flusher"] == 0:
print(f"Messages sent to Kafka topic: {config["topic"]}, Records: {total}")
n = 0
producer.flush() # Ensure message is sent
#print(f"Messages sent to Kafka with key: {key} val: {value}")
#end
# end main
if __name__ == "__main__":
# if 0 then we just want to continue running, so setting it to crazy large,
# it will end eventually...
records = int(os.environ["RECORDS"])
if records == 0:
records = 1000000000000000000000000
# Kafka and Schema Registry configuration
config = {
"bootstrap.servers": "localhost:9092", # Replace with actual broker
"schema.registry.url": "http://localhost:9081", # Replace with actual schema registry
"topic": "factory_iot",
"schema.file": "factory_iot.avsc",
"base_latitude": -26.195246,
"base_longitude": 28.034088,
"records": records,
"flusher": int(os.environ["FLUSHER"]),
"mode": int(os.environ["MODE"]),
"sites_min": int(os.environ["SITES_MIN"]),
"sites_max": int(os.environ["SITES_MAX"]),
"device_min": int(os.environ["DEVICE_MIN"]),
"device_max": int(os.environ["DEVICE_MAX"]),
"sensor_min": int(os.environ["SENSOR_MIN"]),
"sensor_max": int(os.environ["SENSOR_MAX"]),
"measurement_min": float(os.environ["MEASUREMENT_MIN"]),
"measurement_max": float(os.environ["MEASUREMENT_MAX"])
}
main(config)
# THE END ... ;)
creTopics.sh
#!/bin/bash
export COMPOSE_PROJECT_NAME=devlab
docker exec broker kafka-topics \
--create -topic factory_iot \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
# Lets list topics, excluding the default Confluent Platform topics
docker exec broker kafka-topics \
--bootstrap-server localhost:9092 \
--list | grep -v '_confluent' |grep -v '__' |grep -v '_schemas' | grep -v 'default' | grep -v 'docker-connect'
./factory_iot.sh
factory_iot.sh
#!/bin/bash
# Register Schema's on local topics
schema=$(cat ./factory_iot.avsc | sed 's/\"/\\\"/g' | tr -d "\n\r")
SCHEMA="{\"schema\": \"$schema\", \"schemaType\": \"AVRO\"}"
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data "$SCHEMA" \
http://localhost:9081/subjects/factory_iot-value/versions
The Schema File / factory_iot.avsc
{
"type": "record",
"name": "SensorReading",
"fields": [
{"name": "ts", "type": "int"},
{
"name": "metadata",
"type": {
"type": "record",
"name": "Metadata",
"fields": [
{"name": "siteId", "type": "int"},
{"name": "deviceId", "type": "int"},
{"name": "sensorId", "type": "int"},
{"name": "unit", "type": "string"},
{"name": "ts_human", "type": ["null", "string"], "default": null},
{
"name": "location",
"type": [
"null",
{
"type": "record",
"name": "Location",
"fields": [
{"name": "latitude", "type": "double"},
{"name": "longitude", "type": "double"}
]
}
],
"default": null
},
{"name": "deviceType", "type": ["null", "string"], "default": null}
]
}
},
{"name": "measurement", "type": "double"}
]
}
See attached diagram for example of garble.