您可以使用Kafka的时间管理器(TimeBasedUUID)函数将消息的延迟时间与新消息的主题一起存储在Kafka中。 在此之后,将启动一个Kafka Consumer Group,该组将,根据您设置的时间,在指定时间后消耗并发送该消息。以下是Kafka消息延迟发送的代码示例。
在这个示例中,使用UUID来生成具有唯一ID的随机字符串。消息格式将包含(topic,message,delay)。在示例中,我们设置了一个延迟时间为3秒钟。
producer.py:
import time
import json
from kafka import KafkaProducer
from uuid import uuid4
producer = KafkaProducer(bootstrap_servers='localhost:9092')
def send_to_kafka(topic, data, delay):
obj = {"data": data, "topic": topic, "delay": delay}
future = producer.send("delayed-messages", json.dumps(obj).encode('utf-8'), key=str(uuid4()).encode('utf-8'))
result = future.get(timeout=60)
print(result)
if __name__ == "__main__":
send_to_kafka("my-topic", {"id": 1, "message": "Hello World!"}, 3)
consumer.py:
import time
import json
from kafka import KafkaConsumer, KafkaProducer
from datetime import datetime, timedelta
from uuid import UUID, uuid4
KAFKA_TOPIC = "delayed-messages"
KAFKA_CONSUMER_GROUP = "delayed-consumer-group"
KAFKA_SERVER = "localhost:9092"
consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_SERVER, groupId=KAFKA_CONSUMER_GROUP)
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for message in consumer:
value = message.value
data = json.loads(value.decode('utf-8'))
currtime = datetime.now()
timestamp = (currtime + timedelta(seconds=data["delay"])).strftime('%Y-%m-%d %H:%M:%S')
if currtime >= datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S'):
print("Message sent: " + str(value))
future = producer.send(data["topic"], json.dumps(data["data"]).encode('utf-8'))
result = future.get(timeout=60)
print(result)