Reducing a stream/table

What I have, are two streams (from two different systems, imported via connectors). Some of the information from the different streams will be used to build combined information.

Currently, I’m working with ksqlDB but I’m having problems with the last step to reduce the information from both streams.

Both streams contains a tree structure (id/parentId), so I’ve used a second table for each stream to find certain information from the parents, which is then joined into a table containing all the information to do the final reduce.

The main matching column is always the same, however, one or more columns (not fixed) is also needed to do the final match. The columns might also be partial matches between them.

An example output of the table might look like this:

| id | match | matchExtra1 | matchExtra2 | matchExtra3 |
|  1 |     1 |      Extra1 |      Extra2 |      Extra3 |
|  2 |     1 |      Extra1 |      Extra4 |      Extra5 |
|  3 |     1 |      Extra6 |      Extra7 |      Extra8 |
|  4 |     1 |      Extra9 |      Extr10 |        tra8 |

In this case, id 1 and 2 should be matched and id 3 and 4 should be another match.

If this is possible within ksqlDB, that would be great. If needed to work with low-level Kafka, that’s fine as long as we can achieve the end result.

Not sure if I can follow. Seems you are using the term “stream” quite loosely and you don’t necessarily mean a STREAM in ksqlDB? For example you say:

I’ve used a second table for each stream

It’s not clear to me if you refer to a STREAM here or just a Kafka topic?

Also not sure what you mean by “to reduce the information” – “reduce” sounds like an aggregation but I am not sure if you actually mean a join?

Both streams contains a tree structure (id/parentId)

What is a “tree structure” – are you referring to nested data (ie, a STRUCT type in ksqlDB), or are you just saying that there record contain two attributes that describe their logical relationship?

The best way to help might be if you could give a concrete data example: just a few records for both inputs and explain what output you want to compute. Without the input, the shown output table is hard to understand.

I see you’ve read the Stack overflow question, so you’ve seen the basic flow chart. The Kafka source connectors insert into separate topics. Those are the only topics that are touched outside of the ksqlDB. The rest are ksqlDB STREAMs or TABLEs.

Yes, I guess it would be an aggregation. What I actually mean is that if I have three rows, based on their content, they could potentially be reduced to one or two rows, or even stay as three rows, based on their content.

I’m just saying that they contain two attributes that describe their logical relationship. :slight_smile:

Thanks for taking the time and hope this makes it a bit clear. There are a of course a few more columns and the tree structure is more than one level, but just to keep things a bit simpler. In the end, the solution is probably quite simple :slight_smile:

SELECT * FROM stream1 EMIT CHANGES;
| id | parentId | match | matchExtra1 |
|  1 |       58 |     1 |      Extra1 |
|  2 |       75 |     1 |      Extra1 |

SELECT * FROM stream2 EMIT CHANGES;
| id | match | matchExtra1 | matchExtra2 | matchExtra3 |
|  3 |     1 |      Extra6 |      Extra7 |      Extra8 |
|  4 |     1 |      Extra9 |      Extr10 |        tra8 |

CREATE stream1+2 AS SELECT *, source = 'stream1' FROM stream1
INSERT INTO stream1+2 AS SELECT *, source = 'stream2' FROM stream2

SELECT * FROM stream1+2 EMIT CHANGES;
| id | match | matchExtra1 | matchExtra2 | matchExtra3 | source  |
|  1 |     1 |      Extra1 |        null |        null | stream1 |
|  2 |     1 |      Extra1 |        null |        null | stream1 |
|  3 |     1 |      Extra6 |      Extra7 |      Extra8 | stream2 |
|  4 |     1 |      Extra9 |      Extr10 |        tra8 | stream2 |

CREATE TABLE table1extra AS SELECT LATEST_BY_OFFSET, etc FROM stream1 ...

CREATE TABLE table1+2 AS SELECT LATEST_BY_OFFSET, etc FROM stream1+2 ...

CREATE TABLE table AS
SELECT t12.* t1e.matchExtra1 AS matchExtra2 FROM table1+2 AS t12
LEFT JOIN table1extra ON t12.id = t1e.parentId

SELECT * FROM table
| id | match | matchExtra1 | matchExtra2 | matchExtra3 | source  |
|  1 |     1 |      Extra1 |      Extra2 |      Extra3 | stream1 |
|  2 |     1 |      Extra1 |      Extra4 |      Extra5 | stream1 |
|  3 |     1 |      Extra6 |      Extra7 |      Extra8 | stream2 |
|  4 |     1 |      Extra9 |      Extr10 |        tra8 | stream2 |

-- One of the sources will have priority to the combined data
-- In this case, Extra 1 & 3 from stream 2 and Extra 2 from stream 1
-- The matching might also be on partially, based on the source and column
SELECT * FROM reducedtable
| uuid() | id                 | match | matchExtra1 | matchExtra2 | matchExtra3 |
| ...    | (1 matched with 2) |     1 |      Extra1 |      Extra2 |      Extra5 |
| ...    | (3 matched with 4) |     1 |      Extra9 |      Extra7 |        tra8 |

I am still not sure if I can follow. I understand that you have 2 input streams with different schemas. You first merge both input streams using a unified schema (non existing columns are filled up with NULL) – it’s unclear why you do this though. I also understand that you create two tables (one from stream1 and one from the merged stream), and afterward join both tables. – Again, it’s unclear why you do this? Is this part of your requirement or goal? Or part of your attempted solution to the problem (it’s still unclear what you try to achieve)?

I would be more interested what you try to achieve, and less how you try to achieve it currently?

I also don’t understand the result (it this the indented result?): Given the input data you showed, it does not seems to add up? For example, the first result row | 1 | 1 | Extra1 | Extra2 | Extra3 | stream1 | – it shows Extra2 and Extra3 but there is no row in either stream1 input nor stream2 input that has a value Extra2 or Extra3 – where does it come from? Similar for other result rows.

Given the query that computes the joins and input data, the result would be:

| t12.id | t12.parentId | t12.match | t12.matchExtra1 | t12.matchExtra2 | t12.matchExtra3 | t12.source | t1e.matchExtra2
|      1 |           58 |         1 |          Extra1 |            null |            null |    stream1 |            null
|      2 |           75 |         1 |          Extra1 |            null |            null |    stream1 |            null
|      3 |         null |         1 |          Extra6 |          Extra7 |          Extra8 |    stream2 |            null
|      4 |         null |         1 |          Extra9 |          Extr10 |            tra8 |    stream2 |            null

As you do a join t12.id = t1e.parentId you try to join t12.ids with value 1,2,3,4 to t1e.pareentId with values 58,75 and thus there is not join happening: because you do a left join, all t12 columns are preserved and the column matchExtra2 from t1e is filled with NULL.

Later you have a table reducedtable, and it says (1 matched with 2) – again, what does this mean? Also, how do you define a “match”? Note the kslqDB allows to so equi-joins and aggregation via GROUP BY that also require the two rows have the same value in some column. Not sure if you are trying to “merge/join” rows on a non-equi condition (this would not be supported)?

The main purpose is to combine information between the two systems to get a full view of what data we have available in which system.

To do that, I’m aiming for a table that contains latest versions of unique/distinct objects based on the contents of the two original streams (one stream from each system). Most of these objects exists in both streams but with different scheme and data. Some of this data is comparable between the two streams and is how we know which objects that belongs together. Some objects might only exist in either stream and also needs to be handled.

The main comparable attribute between the streams is fully comparable (such as a serial number), but since the attribute is not globally unique, we need to compare other attributes as well, which are not always equal (perhaps a Levenshtein match or similar)

The data from one of the streams contains a node tree structure, where the two attributes describe their logical relationship. The object we’re looking for is the last/end child of the structure and data from the parent is one of the key matching attributes and I used a join to get this data together.

All joins and such are part of my attempted solution, not necessarily part of a working solution. My example data fails a bit as I tried to make it as small as possible. Hopefully my end result description tells more what I’m trying to do. Otherwise I’ll compile a fully working example.

ksqlDB does not support this case. Only equi-joins are supported right now. I cannot think of any workaround for now. We hope to add theta joins at some point, but it’s not on the current roadmap.

This topic was automatically closed after 30 days. New replies are no longer allowed.