KSQLDB REST API: cURL works, httpx/requests libs do not

I have this test application, which succesfully creates a table (for a push query (not a CATS for pull-queries)) and then invoke the push query over the KSQLDB REST API, when invoked with main.py --createTables

To replicate the problem, I have created a data_generator.py to populate data into a topic.

import asyncio
import json
import random
import time
from aiokafka import AIOKafkaProducer

# Asynchronous function to generate random temperature records
async def generate_temperature_records(n=100):
    locations = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]
    
    for _ in range(n):
        record = {
            "temperature": round(random.uniform(-20, 40), 2),  # Temperature in Celsius
            "location": random.choice(locations),
            "time_of_measurement": int(time.time() * 1000)  # Current time in milliseconds
        }
        yield record

# Asynchronous function to produce temperature records to Kafka
async def produce_to_kafka():
    producer = AIOKafkaProducer(
        bootstrap_servers='localhost:9092',  # Kafka broker connection details
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),  # Serialize values as JSON
        key_serializer=lambda k: str(k).encode('utf-8')  # Serialize keys as string
    )
    await producer.start()
    try:
        async for record in generate_temperature_records():
            key = record['location']  # Use location as the key
            await producer.send_and_wait("temperature_measurements", key=key, value=record)
            print("Record produced to Kafka:", record)
            await asyncio.sleep(random.uniform(0.1, 5))  # Arbitrary frequency between 0.1 to 5 seconds
    finally:
        await producer.stop()

# Asynchronous main function to run the producer
async def main():
    await produce_to_kafka()

if __name__ == "__main__":
    asyncio.run(main())

And then there is the main.py which creates the ksqldb table and invokes the push query in order to consume the continuous results. Run it with py main.py --createTables initially.

import httpx
import asyncio
import json
import argparse

# Define the KSQLDB server URL for HTTP/2.0
ksqldb_url = 'http://localhost:8088/ksql'
query_stream_url = 'http://localhost:8088/query-stream'

async def create_table():
    # Step 1: Create the KSQLDB Table with Primary Key
    create_table_statement = """
    CREATE TABLE IF NOT EXISTS temperature_measurements (
      location STRING PRIMARY KEY,
      temperature DOUBLE,
      time_of_measurement BIGINT
    ) WITH (
      KAFKA_TOPIC='temperature_measurements',
      VALUE_FORMAT='JSON',
      PARTITIONS=1
    );
    """

    # Send the POST request to create the table
    async with httpx.AsyncClient() as client:
        response = await client.post(ksqldb_url, json={"ksql": create_table_statement, "streamsProperties": {}})

        # Check if the request was successful
        if response.status_code == 200:
            print("Table created successfully.")
            n_sleep = 60
            print(f"Sleeping for {n_sleep} seconds before querying the table.")
            await asyncio.sleep(n_sleep)  # Wait for 60 seconds to allow data propagation
        else:
            print(f"Failed to create table: {response.status_code}")
            print(response.json())

async def execute_push_query():
    push_query = "SELECT * FROM temperature_measurements EMIT CHANGES;"
    print(f"Starting to run Continuous/Push Query: {push_query}")

    headers = {
        "Content-Type": "application/vnd.ksql.v1+json"
    }
    body = {"sql": push_query, "streamsProperties": {}}

    async with httpx.AsyncClient(http2=True) as client:
        try:
            response = await client.post(query_stream_url, headers=headers, json=body, timeout=None)

            print(f"HTTP Version: {response.http_version}")
            print(f"Response Status: {response.status_code}")
            print(f"Response Headers: {response.headers}")

            if response.status_code == 200:
                # Print the query results
                async for message in response.aiter_lines():
                    if message:
                        try:
                            result = json.loads(message)
                            print("Query result:", result)
                        except json.JSONDecodeError:
                            print("Received non-JSON message:", message)
            else:
                print(f"Failed to execute query: {response.status_code}")
                print(response.json())
        except Exception as e:
            print(f"Exception occurred: {e}")

def main():
    parser = argparse.ArgumentParser(description="Control whether to create the table.")
    parser.add_argument('--createTables', action='store_true', default=False, help="Create table if specified.")
    args = parser.parse_args()

    if args.createTables:
        print("Creating table.")
        asyncio.run(create_table())
    else:
        print("Omitting table creation.")

    asyncio.run(execute_push_query())

if __name__ == "__main__":
    main()

For whatever reason, this script does never receive a response (nor an error), where as the cURL commands does consume continuously the records freshly produced.

http2 url and syntax


curl --http2 -X "POST" "http://localhost:8088/query-stream" -d '{"sql": "SELECT * FROM temperature_measurements EMIT CHANGES;", "streamsProperties": {}}'

Why can curl receive the data for http1.1 and http2 but the main.py cannot?

http1.1 url and syntax


curl --http1.1 -X "POST" "http://localhost:8088/query" -H "Accept: application/vnd.ksql.v1+json" -d '{ "ksql": "SELECT * FROM temperature_measurements EMIT CHANGES;", "streamsProperties": {}}'

Does anyone have a working python snippet for comparison?

To quickly setup the ksqldb environment I can offer this docker-compose.yaml

version: '3'
services:
  kafka1:
    image: apache/kafka:3.7.0
    container_name: kafka1
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_LISTENERS: 'INTERNAL://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://0.0.0.0:9092'
      KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka1:29092,EXTERNAL://localhost:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

  kafka2:
    image: apache/kafka:3.7.0
    container_name: kafka2
    hostname: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_LISTENERS: 'INTERNAL://kafka2:29092,CONTROLLER://kafka2:29093,EXTERNAL://0.0.0.0:9093'
      KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka2:29092,EXTERNAL://localhost:9093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

  kafka3:
    image: apache/kafka:3.7.0
    container_name: kafka3
    hostname: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
      KAFKA_LISTENERS: 'INTERNAL://kafka3:29092,CONTROLLER://kafka3:29093,EXTERNAL://0.0.0.0:9094'
      KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka3:29092,EXTERNAL://localhost:9094'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093,2@kafka2:29093,3@kafka3:29093'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'

  schema-registry:
    image: confluentinc/cp-schema-registry
    container_name: kafka-schema-registry
    hostname: schema-registry
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka1:29092,kafka2:29092,kafka3:29092'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8081'
    depends_on:
      - kafka1
      - kafka2
      - kafka3

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092,kafka2:29092,kafka3:29092
      KAFKA_CLUSTERS_0_KAFKAVERSION: "3.7.0"

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.29.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    ports:
      - "8088:8088"
    environment:
      KSQL_KSQL_SERVICE_ID: KSQL_SERVICE_
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka1:29092,kafka2:29092,kafka3:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.29.0
    container_name: ksqldb-cli
    depends_on:
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true
  
  confluent-control-center:
    image: confluentinc/cp-enterprise-control-center:latest
    container_name: confluent-control-center
    hostname: control-center
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka1:29092,kafka2:29092,kafka3:29092'
      CONTROL_CENTER_REPLICATION_FACTOR: 3
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 2
      CONTROL_CENTER_CONNECT_CLUSTER: 'kafka1:29092,kafka2:29092,kafka3:29092'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
    depends_on:
      - kafka1
      - kafka2
      - kafka3
      - schema-registry
      - ksqldb-server

    # Flink Job Manager
  flink-jobmanager:
    image: sahri-flink:1.18.0  # You can use the latest stable version
    ports:
      - "8082:8081"  # Port for Flink Web Dashboard
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
      - FLINK_PROPERTIES= "jobmanager.rpc.address:flink-jobmanager|taskmanager.numberOfTaskSlots:3|parallelism.default:2" 
      - KAFKA_BOOTSTRAP_SERVERS=kafka1:29092,kafka2:29092,kafka3:29092
    stdin_open: true
    tty: true


  # Flink Task Manager
  flink-taskmanager:
    image: sahri-flink:1.18.0  # You can use the latest stable version
    depends_on:
      - flink-jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager
      - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=3
      - KAFKA_BOOTSTRAP_SERVERS=kafka1:29092,kafka2:29092,kafka3:29092


# Create the network first: docker network create local-kafka-network
networks:    
  local-kafka-network:  
    external: true

  

I managed to realize the consumption of the KSQLDB PUSH query with the requests library. Here the stream argument does the trick. If I find out what is the trick with httpx, I’ll post it.

import requests

url = “http://localhost:8088/query-stream
headers = {
“Content-Type”: “application/vnd.ksql.v1+json”,
“Accept”: “application/vnd.ksql.v1+json”,
}

data = {
“sql”: “SELECT * FROM stream_temperature_measurements EMIT CHANGES;”,
“streamsProperties”: {}
}

response = requests.post(url, json=data, headers=headers, stream=True)

if response.status_code == 200:
try:
for line in response.iter_lines():
if line:
print(line.decode(‘utf-8’))
except KeyboardInterrupt:
print(“Interrupted by user”)
else:
print(f"Failed to execute query: {response.status_code}")
print(response.text)

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.