import faust
# { liquidity_pool: currency, amount: amount, date: date }
class Transaction(faust.Record):
liquidity_pool: str
amount: int
date: float
broker_credentials=faust.SASLCredentials(
mechanism=faust.types.auth.SASLMechanism.PLAIN,
username='------',
password='------',
)
app = faust.App(
"liquidity-pool-monitoring",
broker="kafka://pkc-ldjyd.southamerica-east1.gcp.confluent.cloud:9092"
broker_credentials=broker_credentials
)
topic = app.topic('transfer.Done', value_type=Transaction)
if __name__ == "__main__":
app.main()
Giving the error:
starting➢ ◟[2022-09-20 16:01:46,772] [53481] [ERROR] [^Worker]: Error: KafkaConnectionError('Connection at pkc-ldjyd.southamerica-east1.gcp.confluent.cloud:9092 closed')
Traceback (most recent call last):
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 375, in _on_read_task_error
read_task.result()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 518, in _read
resp = await reader.readexactly(4)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/streams.py", line 721, in readexactly
raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/worker.py", line 279, in execute_from_commandline
self.loop.run_until_complete(self._starting_fut)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 792, in start
await self._default_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 799, in _default_start
await self._actually_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 823, in _actually_start
await child.maybe_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 851, in maybe_start
await self.start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 792, in start
await self._default_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 799, in _default_start
await self._actually_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 823, in _actually_start
await child.maybe_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 851, in maybe_start
await self.start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 792, in start
await self._default_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 799, in _default_start
await self._actually_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/mode/services.py", line 816, in _actually_start
await self.on_start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 1289, in on_start
await producer.start()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/producer/producer.py", line 296, in start
await self.client.bootstrap()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/client.py", line 210, in bootstrap
bootstrap_conn = await create_conn(
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 96, in create_conn
await conn.connect()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 234, in connect
await self._do_sasl_handshake()
File "/Users/maurobenedito/Documents/GitHub/liquidity-pool.ms/venv/lib/python3.9/site-packages/aiokafka/conn.py", line 263, in _do_sasl_handshake
response = await self.send(sasl_handshake)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
return fut.result()
kafka.errors.KafkaConnectionError: KafkaConnectionError: Connection at pkc-ldjyd.southamerica-east1.gcp.confluent.cloud:9092 closed
◜stopping
On localhost works fine