JsonSchemaConverter generating wrong schema

Hi,
Am using confluentinc/cp-enterprise-replicator-executable:7.2.2 (replicator) to move data from source topic to target kafka topics.

below are converters am using
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter

no issues with key and header, but am facing issue with value converter.

my usecase is,

  1. source kafka topic do not have schema, it is json string we receive.
  2. targert kafka topic, am enabling json schema for kafka topic value which matches with incoming data

issue here is, JsonSchemaConverter is trying to generate (wrong) schema based on source data(json string with correct structure) and validating with given schema_registery url (which has expected json structure).

here is the wrong schema structure where JsonSchemaConverter is creating dynamically
{
“oneOf”: [
{
“type”: “null”
},
{
“connect.type”: “bytes”,
“type”: “string”
}
]
}

complete error stack trace:
[2023-03-14 04:50:04,332] ERROR WorkerSourceTask{id=replicator-sr-test-ie1-local-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)

at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:334)

at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:360)

at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:273)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.common.config.ConfigException: Failed to access JSON Schema data from topic test_1 : Schema being registered is incompatible with an earlier schema for subject "test_1-value", details: [Found incompatible change: Difference{jsonPath='#/', type=TYPE_CHANGED}]; error code: 409; error code: 409

at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:99)

at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)

at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$4(WorkerSourceTask.java:334)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)

at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)

... 11 more

can you please help, why it is creating schema like this? and any solution

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.