I’m facing an issue with a BigQuery Sink Connector V2 having Ingestion Mode UPSERT_DELETE and AVRO keys and values.
The following error is thrown when tombstone events are consumed:
```
Error encountered in task lcc-goz0v3-0. Executing stage ‘TASK_PUT’ with class ‘org.apache.kafka.connect.sink.SinkTask’.
Task ID
task-0
io.confluent.connect.bigquerystorage.exception.ConversionConnectException: Top-level Kafka Connect schema must be of type ‘struct’
at io.confluent.connect.bigquerystorage.convert.BigQueryRecordConverter.convertRecord(BigQueryRecordConverter.java:88)
at io.confluent.connect.bigquerystorage.convert.BigQueryRecordConverter.getConvertedRecord(BigQueryRecordConverter.java:105)
at io.confluent.connect.bigquerystorage.convert.BigQueryRecordConverter.getConvertedRecord(BigQueryRecordConverter.java:39)
at io.confluent.connect.bigquerystorage.write.storageapi.StorageWriteApiWriter$Builder.convertRecord(StorageWriteApiWriter.java:144)
at io.confluent.connect.bigquerystorage.write.storageapi.StorageWriteApiWriter$Builder.addRow(StorageWriteApiWriter.java:114)
at io.confluent.connect.bigquerystorage.BigQueryStorageSinkTask.writeSinkRecords(BigQueryStorageSinkTask.java:283)
at io.confluent.connect.bigquerystorage.BigQueryStorageSinkTask.put(BigQueryStorageSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:753)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:379)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:270)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:272)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:328)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:341)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
```
If I use the same connector configuration but with Ingestion Mode BATCH LOADING, there are no errors.
Could you please advise how I can get the UPSERT_DELETE mode to work with Avro messages?
Thank you!