Connect faust to confluet bootsrap server not working

import faust

# { liquidity_pool: currency, amount: amount, date: date }
class Transaction(faust.Record):
    liquidity_pool: str
    amount: int
    date: float

broker_credentials=faust.SASLCredentials(
    mechanism=faust.types.auth.SASLMechanism.PLAIN,
    username='------',
    password='------',
)

app = faust.App(
    "liquidity-pool-monitoring",
    broker="kafka://pkc-ldjyd.southamerica-east1.gcp.confluent.cloud:9092"
    broker_credentials=broker_credentials
)
topic = app.topic('transfer.Done', value_type=Transaction)

if __name__ == "__main__":
    app.main()

Giving the error:

starting➢ ◟[2022-09-20 16:01:46,772] [53481] [ERROR] [^Worker]: Error: KafkaConnectionError('Connection at pkc-ldjyd.southamerica-east1.gcp.confluent.cloud:9092 closed') 
Traceback (most recent call last):
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 375, in _on_read_task_error
    read_task.result()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 518, in _read
    resp = await reader.readexactly(4)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/streams.py", line 721, in readexactly
    raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/worker.py", line 279, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 792, in start
    await self._default_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 799, in _default_start
    await self._actually_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 823, in _actually_start
    await child.maybe_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 851, in maybe_start
    await self.start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 792, in start
    await self._default_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 799, in _default_start
    await self._actually_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 823, in _actually_start
    await child.maybe_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 851, in maybe_start
    await self.start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 792, in start
    await self._default_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 799, in _default_start
    await self._actually_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 816, in _actually_start
    await self.on_start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 1289, in on_start
    await producer.start()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/producer/producer.py", line 296, in start
    await self.client.bootstrap()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/client.py", line 210, in bootstrap
    bootstrap_conn = await create_conn(
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 96, in create_conn
    await conn.connect()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 234, in connect
    await self._do_sasl_handshake()
  File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 263, in _do_sasl_handshake
    response = await self.send(sasl_handshake)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
kafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at pkc-ldjyd.southamerica-east1.gcp.confluent.cloud:9092 closed
◜stopping

On localhost works fine

1 Like