使用aiokafka中的创建consumer的API时,需要指定参数loop,在程序退出时关闭循环,否则可能会陷入无限循环。示例代码如下:
async def consume(loop):
consumer = AIOKafkaConsumer(
"my_topic",
bootstrap_servers="localhost:9092",
group_id="my_group",
auto_offset_reset="earliest",
loop=loop # 注意,需要传入loop
)
try:
async for message in consumer:
print(message)
finally:
await consumer.stop() # 在程序退出时关闭consumer