Hi Confluent community,
I am looking for advice on a ksqlDB / JSON_SR / Schema Registry issue.
Context
We have the following flow:
- Source topic: raw JSON, no Schema Registry
- ksqlDB: transforms the payload into the target structure
- Target topic: JSON Schema Registry enabled, with existing key and value schemas
- Broker-side schema validation is enabled on the target topic
- We are not allowed to let ksqlDB register new schema versions
- We must use the existing key/value schemas already registered in Schema Registry
The target key is a structured JSON object similar to:
{
“businessId”: “134883”,
“applicationId”: “039”
}
The target value is a deeply nested JSON document.
What works
If we create the ksqlDB target stream with JSON_SR but without specifying KEY_SCHEMA_ID / VALUE_SCHEMA_ID, ksqlDB is able to produce records.
However, ksqlDB auto-registers new key and value schema versions. That is not acceptable for us because the target topic already has governed schemas / data contracts.
What we tried
We created the target stream by explicitly forcing the existing schemas:
CREATE STREAM TARGET_STREAM
WITH (
KAFKA_TOPIC = ‘target-topic’,
KEY_FORMAT = ‘JSON_SR’,
VALUE_FORMAT = ‘JSON_SR’,
KEY_SCHEMA_ID = ,
VALUE_SCHEMA_ID =
);
Then we run an INSERT INTO … SELECT … query from the raw JSON source.
The key is built with a STRUCT and also used in PARTITION BY:
INSERT INTO TARGET_STREAM
SELECT
STRUCT(
businessId := …,
applicationId := …
) AS ROWKEY,
STRUCT(…) AS payload
FROM SOURCE_STREAM
PARTITION BY STRUCT(
businessId := …,
applicationId := …
)
EMIT CHANGES;
The query is accepted and a persistent query is created. No new schema version is registered, which is good.
However, no record is produced to the target topic.
Error
The ksqlDB processing log shows the following error:
{
“TARGET”: “key”,
“ERRORMESSAGE”: “Error serializing message to topic: target-topic. Converting Kafka Connect data to byte failed due to serialization error of topic target-topic: Hint: You probably forgot to add VALUE_SCHEMA_ID when creating the source.”,
“CAUSE”: [
“Converting Kafka Connect data to byte failed due to serialization error of topic target-topic:”,
“Error serializing JSON message”,
“Incompatible schema of type ‘JSON’. Set id.compatibility.strict=false to disable this check”
]
}
Important detail: the failure is always on TARGET = “key”.
Additional tests
We tested several variations:
- Raw JSON source produced by our upstream system
- A manually crafted “perfect” message aligned with a known working record
- Key built from different source fields
- messageDate formatting changes
- Explicit camelCase field names matching the existing schema
- Explicit CAST on the key:
CAST(
STRUCT(
businessId := IFNULL(…, ‘’),
applicationId := IFNULL(…, ‘’)
)
AS STRUCT<
businessId STRING,
applicationId STRING
) AS ROWKEY
- Same CAST expression in PARTITION BY
- Schema Registry compatibility mode temporarily set to NONE
The error remains exactly the same:
TARGET = key
Incompatible schema of type ‘JSON’
Set id.compatibility.strict=false to disable this check
We also tested writing JSON without JSON_SR to the target topic. That was rejected by the Confluent Cloud schema validator, which confirms that the target topic requires Schema Registry serialization.
What we cannot do
We tried to set the following ksqlDB properties before creating the persistent query:
SET ‘id.compatibility.strict’ = ‘false’;
SET ‘producer.id.compatibility.strict’ = ‘false’;
SET ‘ksql.streams.producer.id.compatibility.strict’ = ‘false’;
All were rejected by ksqlDB as unknown / unrecognized properties.
Questions
-
Is there a supported way to pass
id.compatibility.strict=falseto the internal ksqlDB producer / JSON Schema serializer? -
Can this setting be applied per query, per ksqlDB cluster, or only at server configuration level?
-
Is there another SQL-only way to make the ksqlDB generated key schema compatible with an existing JSON Schema key subject?
-
Is this a known limitation with ksqlDB + JSON_SR + existing KEY_SCHEMA_ID for structured keys?
-
Would switching the source topic to Avro help at all, or would the final JSON_SR serialization to the target topic still hit the same issue?
-
Is the recommended pattern in this case to let ksqlDB transform into an intermediate topic, then use a Java producer with KafkaJsonSchemaSerializer for the final write using:
- auto.register.schemas=false
- use.schema.id=
- id.compatibility.strict=false
- json.fail.invalid.schema=true
Any guidance or known workaround would be appreciated.
Thanks.