Iceberg sink connector not working with Azure Event Hub standard tier

Background:

  • cannot establish sink connector from Event Hubs to Iceberg tables in Azure ADLSv2
  • using Azure Event Hubs standard tier
  • using Kafka Connect on Azure Kubernetes
  • using Snowflake’s iceberg REST catalog (AKA “Polaris”)

Suspected problem:

Question:

  • Has anyone encountered this same problem? Is it in fact caused by Azure Event Hub standard tier not supporting Kafka Transactions, and is there a workaround?

Iceberg sink connector configuration:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: <CONNECTOR_NAME>
  labels:
    strimzi.io/cluster: <CLUSTER_NAME>
spec:
  class: io.tabular.iceberg.connect.IcebergSinkConnector
  tasksMax: 1
  config:

    # iceberg catalog
    iceberg.catalog.type: rest
    iceberg.catalog.uri: https://<ACCOUNT_NAME>.privatelink.snowflakecomputing.com/polaris/api/catalog
    iceberg.catalog.credential: <CLIENT_ID:CLIENT_SECRET>
    iceberg.catalog.warehouse: <WAREHOUSE_NAME>
    iceberg.catalog.scope: "PRINCIPAL_ROLE:[REDACTED]"
    iceberg.tables.auto-create-enabled: "true"
    
    # topics  
    topics: <TOPIC_NAME>
    iceberg.tables: <SCHEMA_NAME.TABLE_NAME>
    
    # azure
    adls.connection-string: <ADLSV2_CONNECTION_STRING> 
    iceberg.catalog.io-impl: "org.apache.iceberg.azure.adlsv2.ADLSFileIO"
    iceberg.catalog.include-credentials: "true"

The error returned in Kafka Connect

ERROR [<CONNECTOR_NAME>|task-0] WorkerSinkTask{id=<CONNECTOR_NAME>-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-<CONNECTOR_NAME>-0]
org.apache.kafka.common.KafkaException: Could not find a coordinator with type TRANSACTION with key <KEY> due to unexpected error: FindCoordinator asked for coordinator with type code 1 which is not supported. type code 1 is only supported in Premium or Dedicated sku.
        at org.apache.kafka.clients.producer.internals.TransactionManager$FindCoordinatorHandler.handleResponse(TransactionManager.java:1508)
        at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1236)
        at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
        at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
        at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:463)
        at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:339)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:253)
        at java.base/java.lang.Thread.run(Thread.java:840)kafka.strimzi.io/v1beta2