Flink Performance Query

Hi All,

I have two kafka topics written by an OLTP process, and I am joining these two in Flink Confluent client.

In both these streams, I need to pick the latest record for a primary key, so I created a Flink table to store the latest record first and then joined them.

The performance is not good when I do this, it takes about 30 seconds for the stream to process, if I instead would not do this intermediate table, but just join two query results directly its much faster (< 1 second). Can someone explain how this works, and why the former method is slow and the later is fast?