以下是一个简单的示例代码,演示了如何按顺序消费消息,先消费主题1的消息,再消费主题2的消息。
import threading
import time
from queue import Queue
class Message:
def __init__(self, topic, content):
self.topic = topic
self.content = content
def consumer1(queue):
while True:
message = queue.get()
if message.topic == 'topic1':
print(f'Consumer 1 consuming message from topic1: {message.content}')
elif message.topic == 'topic2':
queue.put(message) # 将主题2的消息重新放回队列
break
queue.task_done()
def consumer2(queue):
while True:
message = queue.get()
if message.topic == 'topic2':
print(f'Consumer 2 consuming message from topic2: {message.content}')
queue.task_done()
def main():
queue = Queue()
# 创建消费者线程
thread1 = threading.Thread(target=consumer1, args=(queue,))
thread2 = threading.Thread(target=consumer2, args=(queue,))
# 启动消费者线程
thread1.start()
thread2.start()
# 模拟生产消息
messages = [
Message('topic1', 'Message 1 from topic1'),
Message('topic1', 'Message 2 from topic1'),
Message('topic2', 'Message 1 from topic2'),
Message('topic1', 'Message 3 from topic1'),
Message('topic2', 'Message 2 from topic2'),
]
# 将消息放入队列
for message in messages:
queue.put(message)
# 阻塞,直到队列中的所有消息都被处理完
queue.join()
# 等待消费者线程结束
thread1.join()
thread2.join()
if __name__ == '__main__':
main()
在上面的示例中,我们使用了queue.Queue
作为消息队列,创建了两个消费者线程consumer1
和consumer2
。consumer1
先消费主题1的消息,如果收到主题2的消息,则将消息重新放回队列,并退出循环。consumer2
只消费主题2的消息。
在main
函数中,我们模拟了一些消息,并将它们放入队列中。然后,我们启动消费者线程,并使用queue.join()
阻塞,直到队列中的所有消息都被处理完。最后,我们等待消费者线程结束。
运行上述代码,输出将按顺序消费主题1的消息,然后再消费主题2的消息:
Consumer 1 consuming message from topic1: Message 1 from topic1
Consumer 1 consuming message from topic1: Message 2 from topic1
Consumer 1 consuming message from topic1: Message 3 from topic1
Consumer 2 consuming message from topic2: Message 1 from topic2
Consumer 2 consuming message from topic2: Message 2 from topic2
下一篇:按顺序写出数组的几个迭代。