Agents of Faust streaming stop event consumption after some days”改写为“Agents of Faust流媒体停止事件消费数天后”,解决该问题的解法是利用Kafka消费者API中offsets的自动提交机制来维护消息的消费状态,消费者定期提交offsets即可保证消息不被重复消费。示例如下:
from kafka import KafkaConsumer
consumer = KafkaConsumer('agent_of_faust', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', max_poll_records=500, value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for message in consumer: # 处理消息 print(message.value) # 手动提交offsets consumer.commit()