consumer = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = consumer.channel()
queue_name = 'my_queue' channel.queue_declare(queue=queue_name, durable=True)
exchange_name = 'my_exchange' channel.exchange_declare(exchange=exchange_name, exchange_type='topic') channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key='my_routing_key')
def callback(ch, method, properties, body): print("Received message:", body.decode()) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming() # 开始消费
import pika
producer = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = producer.channel()
queue_name = 'my_queue' channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange='', routing_key=queue_name, body='Hello, world!', properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode=2 表示消息持久化
producer.close()
def callback(ch, method, properties, body): print("Received message:", body.decode()) if some_error: # 如果发生错误,则拒绝消息并重新入队列 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) else: # 正确处理消息并确认 ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming() # 开始消费