I tried to test the capability of bigquery sink managed connector to upsert data.
The source of the data is from datagen and this is the setting
| Select output record value format | AVRO |
|---|---|
| Select a template | ORDERS |
| value.converter.decimal.format | NUMERIC |
|---|---|
| value.converter.replace.null.with.default | true |
| value.converter.reference.subject.name.strategy | DefaultReferenceSubjectNameStrategy |
| value.converter.schemas.enable | false |
| errors.tolerance | none |
| value.converter.value.subject.name.strategy | TopicNameStrategy |
| key.converter.key.subject.name.strategy | TopicNameStrategy |
| value.converter.ignore.default.for.nullables | false |
| Enable Connector Auto-restart | true |
| Schema context | default |
| Max interval between messages (ms) | 2000 |
And this is the bigquery sink connector
| Ingestion Mode | STREAMING |
|---|---|
| Input Kafka record value format | AVRO |
| value.converter.decimal.format | BASE64 |
|---|---|
| value.converter.replace.null.with.default | true |
| value.converter.reference.subject.name.strategy | DefaultReferenceSubjectNameStrategy |
| value.converter.schemas.enable | false |
| errors.tolerance | all |
| value.converter.value.subject.name.strategy | TopicNameStrategy |
| key.converter.key.subject.name.strategy | TopicNameStrategy |
| value.converter.ignore.default.for.nullables | false |
| Enable Connector Auto-restart | true |
| Schema context | default |
| Max poll interval(ms) | 300000 |
| Max poll records | 500 |
| Input Kafka record key format | STRING |
| Kafka Topic to BigQuery Table Map | order:order |
| Sanitize topics | true |
| Auto update schemas | DISABLED |
| Sanitize field names | false |
| Auto create tables | DISABLED |
| Use Date Time Formatter | false |
| Use INTEGER for INT8 and INT16 | false |
my bigquery table already has Primay Key
The sink connector are working well but, the issue started when i change the ingestion mode to Upsert or Upsert and Delete, the messages are sent to DLQ instead of BigQuery table.
I believe i already follow all the requirement from the documentation.
Can someone help me to make this sink work?
Best regards
