Timeout while waiting for command topic consumer to process command topic

hi. I create some ksql throw API like

CREATE STREAM IF NOT EXISTS input (files ARRAY<STRING>, source STRING, location STRUCT<longitude STRING, latitude STRING>) WITH (KAFKA_TOPIC='sensors', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON'); 
CREATE STREAM IF NOT EXISTS output (files ARRAY<STRING>, source STRING, location STRUCT<longitude STRING, latitude STRING>) WITH (KAFKA_TOPIC='results', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON'); 
CREATE STREAM filteredInput as SELECT * FROM input WHERE source = 'local';
 INSERT INTO output SELECT * FROM filteredInput WHERE (GETHOUR() * 60 + GETMINUTE() >= 0 AND GETHOUR() * 60 + GETMINUTE() < 1439);

if I create just once or with delay of at least 5 seconds - everything good, but if I run some queries like above one by one - I receive error

      message: "Could not write the statement 'CREATE STREAM IF NOT EXISTS input \n" +
        '        (files ARRAY<STRING>, source STRING, location STRUCT<longitude STRING, latitude STRING>) \n' +
        "        WITH (KAFKA_TOPIC='sensors', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON');' into the command topic.\n" +
        'Caused by: Timeout while waiting for command topic consumer to process command\n' +
        '\ttopic',

what the problem is? I don’t understand
I have just single broker, zookeeper, ksqldbserver with no replicas, clusters or smth else

also I have a problem when I run request above and immediately remove it with request

TERMINATE INSERTQUERY_1; DROP STREAM IF EXISTS FILTEREDINPUT DELETE TOPIC; DROP STREAM IF EXISTS OUTPUT; DROP STREAM IF EXISTS INPUT;

it works but only at least after 5 seconds after creation. in another case I receive the same error

  message: "Could not write the statement 'DROP STREAM IF EXISTS FILTEREDINPUT DELETE TOPIC;' into the command topic.\n" +
    'Caused by: Timeout while waiting for command topic consumer to process command\n' +
    '\ttopic',

how I can resolve this problems? my application need to create and remove ksql flows when user request it and it’s possible that he create flow and immediately want to delete it or when multiple users create flows in the same time

what I need to do? maybe some environment variables for dockers or only write my own internal queue for actions and run requests with delay between each of them?

thanks a lot

UPD: all users are working with the same kafka topic ‘sensors’ and ‘results’ but we create new stream for each user request which we remove later. so maybe the problem in that 1 topic can’t normally process creation of some streams in the same time, but it works with delay

This topic was automatically closed after 30 days. New replies are no longer allowed.