I created a BQ Connector with below properties are true,
“allowNewBigQueryFields”: “true”,
“allowBigQueryRequiredFieldRelaxation”: “true”,
“allowSchemaUnionization”: “true”,
But the connector keep on failing if any change in the message schema. The expectation is that if any addition field came in message the connector will not fail and the BQ table needs to updated with the new field.
Attaching the connector config,
{
“name”: “”,
“config”: {
“connector.class”: “com.wepay.kafka.connect.bigquery.BigQuerySinkConnector”,
“tasks.max”: “1”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“transforms”: “ChangeTopicName”,
“errors.log.enable”: “true”,
“errors.log.include.messages”: “true”,
“topics”: “”,
“transforms.ChangeTopicName.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.ChangeTopicName.regex”: “”,
“transforms.ChangeTopicName.replacement”: “
“project”: “”,
“defaultDataset”: “”,
“schemaRetriever”: “com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever”,
“keyfile”: ,
“keySource”: “JSON”,
“sanitizeTopics”: “true”,
“autoCreateTables”: “false”,
“allowNewBigQueryFields”: “true”,
“allowBigQueryRequiredFieldRelaxation”: “true”,
“allowSchemaUnionization”: “true”,
“bigQueryRetry”: “1”,
“bigQueryPartitionDecorator”: “false”,
“principal.service.name”: “”,
“principal.service.password”: “”,
“autoUpdateSchemas”: “true”,
“consumer.sasl.jaas.config”: “org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";”,
“auto.evole”: “true”,
“maxWriteSize”: “1000”,
“consumer.topic.bootstrap.servers”: “”,
“value.converter.schema.registry.url”: “”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“tableWriteWait”: “100”,
“bufferSize”: “10000”
}
}