Hi,
Am using confluentinc/cp-enterprise-replicator-executable:7.2.2 (replicator) to move data from source topic to target kafka topics.
below are converters am using
header.converter=io.confluent.connect.replicator.util.ByteArrayConverter
key.converter=io.confluent.connect.replicator.util.ByteArrayConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter
no issues with key and header, but am facing issue with value converter.
my usecase is,
- source kafka topic do not have schema, it is json string we receive.
- targert kafka topic, am enabling json schema for kafka topic value which matches with incoming data
issue here is, JsonSchemaConverter is trying to generate (wrong) schema based on source data(json string with correct structure) and validating with given schema_registery url (which has expected json structure).
here is the wrong schema structure where JsonSchemaConverter is creating dynamically
{
“oneOf”: [
{
“type”: “null”
},
{
“connect.type”: “bytes”,
“type”: “string”
}
]
}
complete error stack trace:
[2023-03-14 04:50:04,332] ERROR WorkerSourceTask{id=replicator-sr-test-ie1-local-2} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:360)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:273)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.config.ConfigException: Failed to access JSON Schema data from topic test_1 : Schema being registered is incompatible with an earlier schema for subject "test_1-value", details: [Found incompatible change: Difference{jsonPath='#/', type=TYPE_CHANGED}]; error code: 409; error code: 409
at io.confluent.connect.json.JsonSchemaConverter.fromConnectData(JsonSchemaConverter.java:99)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$4(WorkerSourceTask.java:334)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
... 11 more
can you please help, why it is creating schema like this? and any solution