hi, I am using azure event hub(kafka head) and I am trying to create a iceberg sink connector in kafka connect using the following config
{
"connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "metrics2",
"iceberg.tables": "feed.test-messages",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.tables.schema-force-optional": "true",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://xxxxxxxx.snowflakecomputing.com/polaris/api/catalog",
"iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO",
"iceberg.catalog.include-credentials": "true",
"iceberg.catalog.warehouse": "lakehouse-test-snowflake",
"iceberg.catalog.credential": "client_id:client_seccret",
"icebberg.catalog.scope": "PRINCIPAL_ROLE:xxx",
"name": "sink-feed-snowflake",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
I am using auto-create-table in Kafka Connect . and the table get’s created
and I can see a metadata folder and a data folder and inside metadata folder there is one json file. and data folder has a couple of parquet files
after further debugging found the below logs
ERROR Coordinator error during process, exiting thread (io.tabular.iceberg.connect.channel.CoordinatorThread)
java.lang.IllegalStateException: Unrecognized header bytes: 0x%02X 0x%02X [0, 0]
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:686)
at org.apache.iceberg.avro.AvroEncoderUtil.decode(AvroEncoderUtil.java:73)
at org.apache.iceberg.connect.events.AvroUtil.decode(AvroUtil.java:63)
at io.tabular.iceberg.connect.channel.EventDecoder.decode(EventDecoder.java:73)
at io.tabular.iceberg.connect.channel.Channel.lambda$consumeAvailable$2(Channel.java:131)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at io.tabular.iceberg.connect.channel.Channel.consumeAvailable(Channel.java:125)
at io.tabular.iceberg.connect.channel.Coordinator.process(Coordinator.java:108)
at io.tabular.iceberg.connect.channel.CoordinatorThread.run(CoordinatorThread.java:40)
[2024-10-10 10:05:14,909] INFO Channel stopping (io.tabular.iceberg.connect.channel.Channel)
│ [2024-10-10 09:04:04,014] INFO Commit timeout reached. Now: 1728551044014, start: 1728551013945, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState) │
│ [2024-10-10 09:04:04,014] INFO Processing commit after responses for d505acc5-1b2c-4ebf-bc30-2a91bdbd4e90, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator)
when I read the one metadata json file that was created, it had no snapshots.
then when I tried with the same Kafka Connect setup and same sink connector configs, but with confluent kafka it worked
is there an issues in iceberg sink connector using azure event hub or am I missing something?