KSQL-Kafka Streams

I have Kafka streams as follows:

topic1:
event example in Avro:

{
“ID”: {“string”:“S1”},
“text”:{“string”:“department”}
}
{
“ID”: {“string”:“S2”},
“text”:{“string”:“minimum”}
}

{
“ID”: {“string”:“S3”},
“text”:{“string”:“maximum”}
}
{
“ID”: {“string”:“S4”},
“text”:{“string”:“telephone”}
}

stream1 which is a reference has details on any abbreviations built on topic1:
stream1 definition:
full_name string

example values:
—±----
|ID|text|
—±----
|S1|department
|S2|cellphone
|S3|fax
|S4|telephone


topic2:
event example in Avro:

{
“ID”: {“string”:“S1”},
“value”:{“string”:“department”}
}
{
“ID”: {“string”:“S2”},
“value”:{“string”:“cellphone”}
}

{
“ID”: {“string”:“S3”},
“value”:{“string”:“maximum”}
}
{
“ID”: {“string”:“S4”},
“value”:{“string”:“telephone”}
}

stream2 which is a reference has details on any abbreviations built on topic1:
stream2 definition:
full_name string

example values:
—±-----
|ID|value|
—±-----
|S1|dt
|S2|cp
|S3|fax
|S4|th


topic3:
event example in Avro:
{
“ID_k”: {“string”:“K1”},
“received_abbrevation”:{“string”:“dt”}
}
{
“ID_k”: {“string”:“K2”},
“received_abbrevation”:{“string”:“cp”}
}

stream3 which receives event from topic2.

stream3 definition:

ID string
received_abbrevation string.

example values:
-----±--------------------
|ID_k|received_abbrevation|
-----±--------------------
|K1 |dt |
|K2 |cp |

what I want as a result is, whenever, there is new event occurring in stream3 it should query stream1 and stream2 from row1 till …n always.
and display the “text” value from stream1 by comparing ID of stream1 and stream2 and get the stream2.value as (select received_abbrevation from stream3) display the result as stream4.

note: If there a new event in topic1/stream1, there is no need to query stream2.

the resultant stream should look like:

actual_name
department
cellphone

For example, as in SQL query we can write as below:
select stream1.text as actual_name
from stream1, stream2
where stream1.ID=stream2.ID and stream2.value(select received_abbrevation from stream3);

Kindly let me know how I can achieve this in KSQL.

I guess you should asked you question in ksqlDB - Confluent Community.

“Kafka Streams” is, well, about Kafka Streams, the Java stream processing library of Kafka (Apache Kafka).