I have this test application, which succesfully creates a table (for a push query (not a CATS for pull-queries)) and then invoke the push query over the KSQLDB REST API, when invoked with main.py --createTables
To replicate the problem, I have created a data_generator.py to populate data into a topic.
import asyncio
import json
import random
import time
from aiokafka import AIOKafkaProducer
# Asynchronous function to generate random temperature records
async def generate_temperature_records(n=100):
locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]
for _ in range(n):
record = {
"temperature": round(random.uniform(-20, 40), 2), # Temperature in Celsius
"location": random.choice(locations),
"time_of_measurement": int(time.time() * 1000) # Current time in milliseconds
}
yield record
# Asynchronous function to produce temperature records to Kafka
async def produce_to_kafka():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092', # Kafka broker connection details
value_serializer=lambda v: json.dumps(v).encode('utf-8'), # Serialize values as JSON
key_serializer=lambda k: str(k).encode('utf-8') # Serialize keys as string
)
await producer.start()
try:
async for record in generate_temperature_records():
key = record['location'] # Use location as the key
await producer.send_and_wait("temperature_measurements", key=key, value=record)
print("Record produced to Kafka:", record)
await asyncio.sleep(random.uniform(0.1, 5)) # Arbitrary frequency between 0.1 to 5 seconds
finally:
await producer.stop()
# Asynchronous main function to run the producer
async def main():
await produce_to_kafka()
if __name__ == "__main__":
asyncio.run(main())
And then there is the main.py which creates the ksqldb table and invokes the push query in order to consume the continuous results. Run it with py main.py --createTables initially.
import httpx
import asyncio
import json
import argparse
# Define the KSQLDB server URL for HTTP/2.0
ksqldb_url = 'http://localhost:8088/ksql'
query_stream_url = 'http://localhost:8088/query-stream'
async def create_table():
# Step 1: Create the KSQLDB Table with Primary Key
create_table_statement = """
CREATE TABLE IF NOT EXISTS temperature_measurements (
location STRING PRIMARY KEY,
temperature DOUBLE,
time_of_measurement BIGINT
) WITH (
KAFKA_TOPIC='temperature_measurements',
VALUE_FORMAT='JSON',
PARTITIONS=1
);
"""
# Send the POST request to create the table
async with httpx.AsyncClient() as client:
response = await client.post(ksqldb_url, json={"ksql": create_table_statement, "streamsProperties": {}})
# Check if the request was successful
if response.status_code == 200:
print("Table created successfully.")
n_sleep = 60
print(f"Sleeping for {n_sleep} seconds before querying the table.")
await asyncio.sleep(n_sleep) # Wait for 60 seconds to allow data propagation
else:
print(f"Failed to create table: {response.status_code}")
print(response.json())
async def execute_push_query():
push_query = "SELECT * FROM temperature_measurements EMIT CHANGES;"
print(f"Starting to run Continuous/Push Query: {push_query}")
headers = {
"Content-Type": "application/vnd.ksql.v1+json"
}
body = {"sql": push_query, "streamsProperties": {}}
async with httpx.AsyncClient(http2=True) as client:
try:
response = await client.post(query_stream_url, headers=headers, json=body, timeout=None)
print(f"HTTP Version: {response.http_version}")
print(f"Response Status: {response.status_code}")
print(f"Response Headers: {response.headers}")
if response.status_code == 200:
# Print the query results
async for message in response.aiter_lines():
if message:
try:
result = json.loads(message)
print("Query result:", result)
except json.JSONDecodeError:
print("Received non-JSON message:", message)
else:
print(f"Failed to execute query: {response.status_code}")
print(response.json())
except Exception as e:
print(f"Exception occurred: {e}")
def main():
parser = argparse.ArgumentParser(description="Control whether to create the table.")
parser.add_argument('--createTables', action='store_true', default=False, help="Create table if specified.")
args = parser.parse_args()
if args.createTables:
print("Creating table.")
asyncio.run(create_table())
else:
print("Omitting table creation.")
asyncio.run(execute_push_query())
if __name__ == "__main__":
main()
For whatever reason, this script does never receive a response (nor an error), where as the cURL commands does consume continuously the records freshly produced.
http2 url and syntax
curl --http2 -X "POST" "http://localhost:8088/query-stream" -d '{"sql": "SELECT * FROM temperature_measurements EMIT CHANGES;", "streamsProperties": {}}'
Why can curl receive the data for http1.1 and http2 but the main.py cannot?
http1.1 url and syntax
curl --http1.1 -X "POST" "http://localhost:8088/query" -H "Accept: application/vnd.ksql.v1+json" -d '{ "ksql": "SELECT * FROM temperature_measurements EMIT CHANGES;", "streamsProperties": {}}'
Does anyone have a working python snippet for comparison?
To quickly setup the ksqldb environment I can offer this docker-compose.yaml
version: '3'
services:
kafka1:
image: apache/kafka:3.7.0
container_name: kafka1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://0.0.0.0:9092'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka1:29092,EXTERNAL://localhost:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
kafka2:
image: apache/kafka:3.7.0
container_name: kafka2
hostname: kafka2
ports:
- "9093:9093"
environment:
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka2:29092,CONTROLLER://kafka2:29093,EXTERNAL://0.0.0.0:9093'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka2:29092,EXTERNAL://localhost:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
kafka3:
image: apache/kafka:3.7.0
container_name: kafka3
hostname: kafka3
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_LISTENERS: 'INTERNAL://kafka3:29092,CONTROLLER://kafka3:29093,EXTERNAL://0.0.0.0:9094'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka3:29092,EXTERNAL://localhost:9094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
schema-registry:
image: confluentinc/cp-schema-registry
container_name: kafka-schema-registry
hostname: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka1:29092,kafka2:29092,kafka3:29092'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
depends_on:
- kafka1
- kafka2
- kafka3
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29092,kafka3:29092
KAFKA_CLUSTERS_0_KAFKAVERSION: "3.7.0"
ksqldb-server:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "8088:8088"
environment:
KSQL_KSQL_SERVICE_ID: KSQL_SERVICE_
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka1:29092,kafka2:29092,kafka3:29092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
confluent-control-center:
image: confluentinc/cp-enterprise-control-center:latest
container_name: confluent-control-center
hostname: control-center
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka1:29092,kafka2:29092,kafka3:29092'
CONTROL_CENTER_REPLICATION_FACTOR: 3
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 2
CONTROL_CENTER_CONNECT_CLUSTER: 'kafka1:29092,kafka2:29092,kafka3:29092'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
depends_on:
- kafka1
- kafka2
- kafka3
- schema-registry
- ksqldb-server
# Flink Job Manager
flink-jobmanager:
image: sahri-flink:1.18.0 # You can use the latest stable version
ports:
- "8082:8081" # Port for Flink Web Dashboard
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
- FLINK_PROPERTIES= "jobmanager.rpc.address:flink-jobmanager|taskmanager.numberOfTaskSlots:3|parallelism.default:2"
- KAFKA_BOOTSTRAP_SERVERS=kafka1:29092,kafka2:29092,kafka3:29092
stdin_open: true
tty: true
# Flink Task Manager
flink-taskmanager:
image: sahri-flink:1.18.0 # You can use the latest stable version
depends_on:
- flink-jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
- TASK_MANAGER_NUMBER_OF_TASK_SLOTS=3
- KAFKA_BOOTSTRAP_SERVERS=kafka1:29092,kafka2:29092,kafka3:29092
# Create the network first: docker network create local-kafka-network
networks:
local-kafka-network:
external: true