首先,需要增加对异常的捕捉和处理。比如,可以使用 try...except 捕捉连接失败的异常,并打印错误信息。
import asyncio
from aiokafka import AIOKafkaProducer
async def send():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092')
try:
await producer.start()
except Exception as e:
print(f"Connect failed: {e}")
return False
await producer.send_and_wait("topic", b"key", b"value")
await producer.stop()
return True
loop = asyncio.get_event_loop()
loop.run_until_complete(send())
其次,可以通过设置连接的超时时间,来判断连接是否成功。如果连接超时,就认为连接失败了。
import asyncio
from aiokafka import AIOKafkaProducer
async def send():
producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
request_timeout_ms=5000) # 增加连接超时设置,5000毫秒
try:
await producer.start()
except asyncio.TimeoutError as e:
print(f"Connect timeout: {e}")
return False
except Exception as e:
print(f"Connect failed: {e}")
return False
await producer.send_and_wait("topic", b"key", b"value")
await producer.stop()
return True
loop = asyncio.get_event_loop()
loop.run_until_complete(send())