Kafka iceberg sink connector config to map each topic to a iceberg table

I have multiple topics created through debezium source connector. I want to write each topic’s data into a iceberg table.

"topics": "dbserver1.inventory.customers,dbserver1.inventory.products_on_hand",
"iceberg.tables": "iceberg_catalog1.customers,iceberg_catalog1.products_on_hand",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"table",
"iceberg.tables.route-field": "__table",
"iceberg.table.iceberg_catalog1.customers.route-regex": "^customers$",
"iceberg.table.iceberg_catalog1.products_on_hand.route-regex": "^products_on_hand$",

I want dbserver1.inventory.customers data in iceberg_catalog1.customers and dbserver1.inventory.products_on_hand in iceberg_catalog1.products_on_hand.

Created an entry into “customers” table, it has flown correctly into iceberg_catalog1.customers table. But if I create an entry into “products_on_hand” table, Im getting error "Caused by: java.lang.NullPointerException: Cannot invoke \"org.apache.iceberg.types.Types$NestedField.fieldId()\" because the return value of \"org.apache.iceberg.Schema.findField(String)\" is null"

I think its using customer schema and trying to customer’s field from event containing "products_on_hand" payload.

It didnt work becauseproducts_on_hand table doesn’t have primary key as “id”.

This was our config.
“iceberg.tables.default-id-columns”: “id”,

Updated config with “product_id” as primary key for products_on_hand, it didnt work. Its again trying to fetch “Id” from the payload.
“iceberg.table.products_on_hand.id-columns”: “product_id”