Tabular sink connector not working with azure event hub (kaka head)

hi, I am using azure event hub(kafka head) and I am trying to create a iceberg sink connector in kafka connect using the following config

{
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "2",
        "topics": "metrics2",
        "iceberg.tables": "feed.test-messages",
        "iceberg.tables.auto-create-enabled": "true",
        "iceberg.tables.schema-force-optional": "true",
        "iceberg.catalog.type": "rest",
        "iceberg.catalog.uri": "https://xxxxxxxx.snowflakecomputing.com/polaris/api/catalog",
        "iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO",
        "iceberg.catalog.include-credentials": "true",
        "iceberg.catalog.warehouse": "lakehouse-test-snowflake",
        "iceberg.catalog.credential": "client_id:client_seccret",
        "icebberg.catalog.scope": "PRINCIPAL_ROLE:xxx",
        "name": "sink-feed-snowflake",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
    }

I am using auto-create-table in Kafka Connect . and the table get’s created
and I can see a metadata folder and a data folder and inside metadata folder there is one json file. and data folder has a couple of parquet files

after further debugging found the below logs

 ERROR Coordinator error during process, exiting thread (io.tabular.iceberg.connect.channel.CoordinatorThread)
java.lang.IllegalStateException: Unrecognized header bytes: 0x%02X 0x%02X [0, 0]
  at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:686)
  at org.apache.iceberg.avro.AvroEncoderUtil.decode(AvroEncoderUtil.java:73)
  at org.apache.iceberg.connect.events.AvroUtil.decode(AvroUtil.java:63)
  at io.tabular.iceberg.connect.channel.EventDecoder.decode(EventDecoder.java:73)
  at io.tabular.iceberg.connect.channel.Channel.lambda$consumeAvailable$2(Channel.java:131)
  at java.base/java.lang.Iterable.forEach(Iterable.java:75)
  at io.tabular.iceberg.connect.channel.Channel.consumeAvailable(Channel.java:125)
  at io.tabular.iceberg.connect.channel.Coordinator.process(Coordinator.java:108)
  at io.tabular.iceberg.connect.channel.CoordinatorThread.run(CoordinatorThread.java:40)
[2024-10-10 10:05:14,909] INFO Channel stopping (io.tabular.iceberg.connect.channel.Channel)

│ [2024-10-10 09:04:04,014] INFO Commit timeout reached. Now: 1728551044014, start: 1728551013945, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState)                                                                                                             │
│ [2024-10-10 09:04:04,014] INFO Processing commit after responses for d505acc5-1b2c-4ebf-bc30-2a91bdbd4e90, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator)

when I read the one metadata json file that was created, it had no snapshots.

then when I tried with the same Kafka Connect setup and same sink connector configs, but with confluent kafka it worked

is there an issues in iceberg sink connector using azure event hub or am I missing something?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.