I have multiple topics created through debezium source connector. I want to write each topic’s data into a iceberg table.
"topics": "dbserver1.inventory.customers,dbserver1.inventory.products_on_hand",
"iceberg.tables": "iceberg_catalog1.customers,iceberg_catalog1.products_on_hand",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"table",
"iceberg.tables.route-field": "__table",
"iceberg.table.iceberg_catalog1.customers.route-regex": "^customers$",
"iceberg.table.iceberg_catalog1.products_on_hand.route-regex": "^products_on_hand$",
I want dbserver1.inventory.customers
data in iceberg_catalog1.customers
and dbserver1.inventory.products_on_hand
in iceberg_catalog1.products_on_hand
.
Created an entry into “customers
” table, it has flown correctly into iceberg_catalog1.customers table
. But if I create an entry into “products_on_hand
” table, Im getting error "Caused by: java.lang.NullPointerException: Cannot invoke \"org.apache.iceberg.types.Types$NestedField.fieldId()\" because the return value of \"org.apache.iceberg.Schema.findField(String)\" is null"
I think its using customer schema and trying to customer’s field from event containing "products_on_hand"
payload.