Kafka-python - Unable to use SSL certificate after migration from v1 to v2

For both kafka-python and kafka-go I faced with the same issue while trying to migrate from v1.3.0 to v2.0.2 using the following producer config:

 ConfigMap{
  "bootstrap.servers": "your_server",
  "security.protocol": "sasl_ssl",
  "sasl.mechanism":    "SCRAM-SHA-256",
  "sasl.username":     "user",
  "sasl.password":     "password",
  // "ssl.ca.pem":       string(mypemBytes),
  "ssl.ca.location":   "test.crt.pem",
}

The same error when specifying ssl.ca.pem instead of ssl.ca.location :
openssl x509 -text -in certificate.pem works like a charm.

log:

%7|1678893870.702|SASL|rdkafka#producer-1| [thrd:app]: Selected provider SCRAM (builtin) for SASL mechanism SCRAM-SHA-256
%7|1678893870.702|OPENSSL|rdkafka#producer-1| [thrd:app]: Using statically linked OpenSSL version OpenSSL 3.0.7 1 Nov 2022 (0x30000070, librdkafka built with 0x30000070)
%7|1678893870.704|SSL|rdkafka#producer-1| [thrd:app]: Loading CA certificate(s) from file test.crt.pem
%7|1678893870.705|BROKER|rdkafka#producer-1| [thrd:app]: sasl_ssl://localhost:9093/bootstrap: Added new broker with NodeId -1
%7|1678893870.705|BRKMAIN|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Enter main broker thread
%7|1678893870.705|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1678893870.705|CONNECT|rdkafka#producer-1| [thrd:app]: sasl_ssl://localhost:9093/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s))
%7|1678893870.705|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Received CONNECT op
%7|1678893870.705|INIT|rdkafka#producer-1| [thrd:app]: librdkafka v2.0.2 (0x20002ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, STRIP STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD CURL HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER OAUTHBEARER_OIDC CRC32C_HW, debug 0x40203)
%7|1678893870.705|STATE|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1678893870.705|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: Broadcasting state change
%7|1678893870.706|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: broker in state TRY_CONNECT connecting
%7|1678893870.706|STATE|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1678893870.706|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: Broadcasting state change
%7|1678893870.705|CONF|rdkafka#producer-1| [thrd:app]: Client configuration:
%7|1678893870.706|CONF|rdkafka#producer-1| [thrd:app]:   client.software.name = confluent-kafka-go
%7|1678893870.706|CONF|rdkafka#producer-1| [thrd:app]:   client.software.version = 2.0.2
%7|1678893870.706|CONF|rdkafka#producer-1| [thrd:app]:   metadata.broker.list = localhost:9093
%7|1678893870.706|CONF|rdkafka#producer-1| [thrd:app]:   debug = generic,broker,security,conf
%7|1678893870.706|CONF|rdkafka#producer-1| [thrd:app]:   enabled_events = 329
%7|1678893870.707|CONF|rdkafka#producer-1| [thrd:app]:   security.protocol = sasl_ssl
%7|1678893870.707|CONF|rdkafka#producer-1| [thrd:app]:   ssl.ca.location = test.crt.pem
%7|1678893870.707|CONF|rdkafka#producer-1| [thrd:app]:   sasl.mechanisms = SCRAM-SHA-256
%7|1678893870.707|CONF|rdkafka#producer-1| [thrd:app]:   sasl.username = [redacted]
%7|1678893870.707|CONF|rdkafka#producer-1| [thrd:app]:   sasl.password = [redacted]
%7|1678893870.710|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connecting to ipv4#127.0.0.1:9093 (sasl_ssl) with socket 12
%7|1678893870.710|CONNECT|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Connected to ipv4#127.0.0.1:9093
%7|1678893870.710|STATE|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Broker changed state CONNECT -> SSL_HANDSHAKE
%7|1678893870.710|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: Broadcasting state change
%7|1678893870.710|ENDPOINT|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Enabled endpoint identification using hostname localhost
%7|1678893870.721|CONNECT|rdkafka#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 33ms: leader query
%7|1678893870.731|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: SSL handshake failed: ssl/statem/statem_clnt.c:1890:tls_post_process_server_certificate error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 21ms in state SSL_HANDSHAKE) (_SSL)
%3|1678893870.731|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: SSL handshake failed: ssl/statem/statem_clnt.c:1890:tls_post_process_server_certificate error:0A000086:SSL routines::certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 21ms in state SSL_HANDSHAKE)
%7|1678893870.731|STATE|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Broker changed state SSL_HANDSHAKE -> DOWN
%7|1678893870.731|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: Broadcasting state change
%7|1678893870.731|STATE|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Broker changed state DOWN -> INIT
%7|1678893870.731|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: Broadcasting state change
%7|1678893870.731|DESTROY|rdkafka#producer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%4|1678893870.731|TERMINATE|rdkafka#producer-1| [thrd:app]: Producer terminating with 1 message (112 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
%7|1678893870.731|TERMINATE|rdkafka#producer-1| [thrd:app]: Interrupting timers
%7|1678893870.731|TERMINATE|rdkafka#producer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1678893870.731|TERMINATE|rdkafka#producer-1| [thrd:app]: Joining internal main thread
%7|1678893870.731|TERMINATE|rdkafka#producer-1| [thrd:main]: Internal main thread terminating
%7|1678893870.750|DESTROY|rdkafka#producer-1| [thrd:main]: Destroy internal
%7|1678893870.750|BROADCAST|rdkafka#producer-1| [thrd:main]: Broadcasting state change
%7|1678893870.750|DESTROY|rdkafka#producer-1| [thrd:main]: Removing all topics
%7|1678893870.750|DESTROY|rdkafka#producer-1| [thrd:main]: Sending TERMINATE to sasl_ssl://localhost:9093/bootstrap
%7|1678893870.750|TERMINATE|rdkafka#producer-1| [thrd:main]: Purging reply queue
%7|1678893870.750|TERMINATE|rdkafka#producer-1| [thrd:main]: Decommissioning internal broker
%7|1678893870.750|TERMINATE|rdkafka#producer-1| [thrd:main]: Join 2 broker thread(s)
%7|1678893870.750|TERM|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1678893870.750|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Client is terminating (after 19ms in state INIT) (_DESTROY)
%7|1678893870.751|STATE|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Broker changed state INIT -> DOWN
%7|1678893870.751|BROADCAST|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: Broadcasting state change
%7|1678893870.751|BRKTERM|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1678893870.751|TERMINATE|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x12b00afa0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1678893870.751|FAIL|rdkafka#producer-1| [thrd:sasl_ssl://localhost:9093/bootstrap]: sasl_ssl://localhost:9093/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1678893870.750|TERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 2 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1678893870.752|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 47ms in state INIT) (_DESTROY)
%7|1678893870.752|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1678893870.752|BROADCAST|rdkafka#producer-1| [thrd::0/internal]: Broadcasting state change
%7|1678893870.752|BRKTERM|rdkafka#producer-1| [thrd::0/internal]: :0/internal: terminating: broker still has 2 refcnt(s), 0 buffer(s), 0 partition(s)
%7|1678893870.752|TERMINATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x12b00cba0), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1678893870.752|FAIL|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1678893870.754|TERMINATE|rdkafka#producer-1| [thrd:main]: Internal main thread termination done
%7|1678893870.754|TERMINATE|rdkafka#producer-1| [thrd:app]: Destroying op queues
%7|1678893870.754|TERMINATE|rdkafka#producer-1| [thrd:app]: Destroying SSL CTX
%7|1678893870.754|TERMINATE|rdkafka#producer-1| [thrd:app]: Termination done: freeing resources
--- FAIL: TestDummyEvent (0.07s)
panic: Unexpected pending: 1 [recovered]
        panic: Unexpected pending: 1

Apache Kafka version: 2.1.1-cp3
Operating system: OSX

Hi there

Did you ever get this working, how where do we get the cert files? my cluster (confluent SaaS) is currently configured to accept username/password.

when using the confluent libraries I can get it working with SASL/SSL with no ssl_context specified, but when using the AIOKakfkaProducer I get stuck.
(for some reason I can’t get the AWS Lambda build done that uses Confluent lib so trying the AIOKafka* setup)

Python 3.9
G

I’m not sure if this is the same issue, but I am also running into a problem connecting to confluent cloud from faust which I believe uses the AioKafka setup.

I can use the confluent kafka python library, but when I use faust, I get a [SSL: CERTIFICATE_VERIFY_FAILED] error.

to solve this I had to set the ssl_context using certify and aiokafka.helpers:

import certifi
from aiokafka.helpers import create_ssl_context

  broker_credentials = faust.SASLCredentials(
      mechanism=faust.types.auth.SASLMechanism.PLAIN,
      ssl_context=create_ssl_context(cafile=certifi.where()), #<- Fix
      username=config.get("sasl.username"),
      password=config.get("sasl.password"),
  )
broker_credentials.protocol = AuthProtocol.SASL_SSL
1 Like