问题描述: 在使用aio-pika库进行异步操作RabbitMQ时,当使用asyncio.run()运行时,连接似乎没有正常关闭。
解决方法:
import asyncio
from aio_pika import connect, Message
async def consume():
# 创建连接
connection = await connect("amqp://guest:guest@localhost/")
try:
# 创建通道
channel = await connection.channel()
# 声明队列
queue = await channel.declare_queue("my_queue")
# 接收消息
async with queue.iterator() as queue_iter:
async for message in queue_iter:
print(message.body.decode())
await message.ack()
# 停止循环
if message.body == b"stop":
break
finally:
# 关闭连接
await connection.close()
# 使用asyncio.run()运行
if __name__ == "__main__":
asyncio.run(consume())
import asyncio
from aio_pika import connect, Message
async def consume():
# 创建连接
connection = await connect("amqp://guest:guest@localhost/")
try:
# 创建通道
channel = await connection.channel()
# 声明队列
queue = await channel.declare_queue("my_queue")
# 接收消息
async with queue.iterator() as queue_iter:
async for message in queue_iter:
print(message.body.decode())
await message.ack()
# 停止循环
if message.body == b"stop":
break
finally:
# 关闭连接
await connection.close()
# 创建事件循环
loop = asyncio.get_event_loop()
# 运行消费者函数
loop.run_until_complete(consume())
# 关闭事件循环
loop.close()
以上两种方法都可以确保在消费者函数运行结束后正确关闭连接。
上一篇:aio-pika的两个消费者通过扇出模式接收消息,但不是同时接收。
下一篇:aiobotocore-AttributeError:'ClientCreatorContext'对象没有属性'send_message'