Hello,
I am trying to insert protobuf serialized messages from a topic with a schema to ClickHouse. I am using the confluent managed clickhouse connector with the following settings:
| Table Refresh Interval | 0 |
|---|---|
| Bypass RowBinary | true |
| Exactly Once | false |
| Error Tolerance | none |
| Enable DLQ Context Headers | false |
| ClickHouse Settings | input_format_binary_read_json_as_string=1 |
| Tolerate State Mismatch | false |
| value.converter.decimal.format | BASE64 |
| value.converter.replace.null.with.default | false |
| value.converter.reference.subject.name.strategy | DefaultReferenceSubjectNameStrategy |
| value.converter.schemas.enable | false |
| value.converter.value.subject.name.strategy | TopicNameStrategy |
| key.converter.key.subject.name.strategy | TopicNameStrategy |
| value.converter.ignore.default.for.nullables | true |
| Enable Connector Auto-restart | true |
| Schema context | default |
| Max poll interval(ms) | 60000 |
| Input Kafka record key format | JSON |
| Max poll records | 5000 |
My protobuf schema, while being quite large has “oneof” properties. It appears that the connector does not handle these fields well as I receive this error:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:787)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:379)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:272)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:328)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:341)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.RuntimeException: Number of records: 3190
at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:122)
at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:56)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:753)
… 11 more
Caused by: java.lang.RuntimeException: Topic: [analytics_events_v2], Partition: [11], MinOffset: [1883872], MaxOffset: [1887061], (QueryId: [37b0ab24-32e2-4535-bcd3-41bd9c81c3b2])
at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:57)
at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:143)
at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:102)
at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:48)
… 12 more
Caused by: java.lang.IllegalArgumentException: Class io.confluent.connect.protobuf.ProtobufData$SchemaWrapper declares multiple JSON fields named ‘parameters’; conflict is caused by fields io.confluent.connect.protobuf.ProtobufData$SchemaWrapper#parameters and org.apache.kafka.connect.data.SchemaBuilder#parameters
See gson/Troubleshooting.md at 3488c30c65afb7bc3715c68b2a4f10ca2500c821 · google/gson · GitHub
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory.createDuplicateFieldException(ReflectiveTypeAdapterFactory.java:313)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory.getBoundFields(ReflectiveTypeAdapterFactory.java:409)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory.create(ReflectiveTypeAdapterFactory.java:161)
at com.google.gson.Gson.getAdapter(Gson.java:628)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:57)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:99)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:59)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:99)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:59)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:99)
at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.write(CollectionTypeAdapterFactory.java:59)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$2.write(ReflectiveTypeAdapterFactory.java:247)
at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.write(ReflectiveTypeAdapterFactory.java:490)
at com.google.gson.internal.bind.ObjectTypeAdapter.write(ObjectTypeAdapter.java:184)
at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.write(TypeAdapterRuntimeTypeWrapper.java:73)
at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.write(MapTypeAdapterFactory.java:222)
at com.google.gson.internal.bind.MapTypeAdapterFactory$Adapter.write(MapTypeAdapterFactory.java:154)
at com.google.gson.Gson.toJson(Gson.java:944)
at com.google.gson.Gson.toJson(Gson.java:899)
at com.google.gson.Gson.toJson(Gson.java:848)
at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertJsonV1(ClickHouseWriter.java:896)
at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertJson(ClickHouseWriter.java:846)
at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:202)
at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:55)