Im joining two Stream using Outerjoin, But im getting a schema error for an internal topic with substring KSTREAM-OUTERSHARED-0000000014-store-changelog-value.
Schema being registered is incompatible with an earlier schema.
MISSING_UNION_BRANCH, message:reader union lacking writer type: RECORD.
The reader is having the schema of one of the topic and writer’s is the schema of the other stream.
Update: I debugged around and found that changing the order of the stream. ie;
rightStream.outerJoin(leftStream) didnt raise any exception. Afterwards I realised that both the records, leftRecord and rightRecord was the same. Now the exception makes sense because since both records were same, the internal topic took the schema of either leftRecord or rightRecord only. Then afterwards when both records of their corresponding stream arrived, then exception occurs because the schema is different from initial ones.
Im baffled on what’s happening. My understanding is that a join candidate is not found then null is passed. Feels like a bug
I was thinking about this a little bit more – do you have auto schema registration enabled? That is a feature that is not recommended in production. The topic in question may contain left and right records at the same time, so registering a schema for it does not really make sense (or compatibly checks must be set to NONE).
It’s totally unclear to my, why flipping left and right side should make any difference…
If you did flip left and right, and did not clear the topic, the previously written left record would now be read as right record and the other way around. You cannot just flip left and right w/o clearing the topic.
I flipped the topics and gave a new Application ID too. Still the issue is persisting.
Can i ask another question. is it possible to write a function that would outerjoin two topics and reuse it multiple times. I made such a function there too im getting schema incompatible error. I called the function two time with left stream as the result of the prev function call. I wanted to make a framework for my org. Now Im stuck with two issues.
update: Im also passing a StreamJoined.as() and I got a Runtime exception saying that a different statestore is already added, exception is also displaying the name of the statestore that I passed during the firstJoin.
public KStream<String, JoinedResult> outerJoin(
KStream<String, GenericRecord> leftTopic,
KStream<String, GenericRecord> rightTopic,
JoinWindows joinWindow,
BiFunction<GenericRecord, GenericRecord,IndexedRecord> recordJoiner,
String joinedAs
) {
var leftStreamJoined = leftTopic.leftJoin(
rightTopic,
getJoiner(recordJoiner, JoinSource.LEFT_TOPIC),
joinWindow,
StreamJoined.as(joinedAs)
);
var rightStreamJoined = rightTopic.leftJoin(
leftTopic,
getJoiner(recordJoiner, JoinSource.RIGHT_TOPIC),
joinWindow,
StreamJoined.as(joinedAs));
return leftStreamJoined.merge(rightStreamJoined);
}```
this is the function that im calling and I rechecked to make sure that Im passing different names
Why do you implement your own outerJoin() function? There is KStream.outerJoin(...) that you could use?
update: Im also passing a StreamJoined.as() and I got a Runtime exception saying that a different statestore is already added, exception is also displaying the name of the statestore that I passed during the firstJoin.
That is expected. You call leftJoin() two times, and must provide unique names for the stores – both leftJoin() calls add there own state stores – they cannot share the same state store, and thus you need to give a unique name for each store. Passing in the same name, does not allow you to share the same store.