可以通过将窗口持续时间设置为 Pub/Sub 保留的最长时间来解决该问题,比如1小时。具体实现代码示例如下:
# 定义窗口持续时间为1小时
window_duration_in_seconds = 60 * 60
# 定义 Pub/Sub 的话题
input_topic = "projects/{project}/topics/{topic}".format(
project=project, topic=topic)
# 定义数据流管道
pipeline = beam.Pipeline(options=options)
# 从 Pub/Sub 话题中读取数据流
messages = (
pipeline
| "Read Pub/Sub Messages" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=input_topic)
| "Decode Messages from JSON" >> beam.Map(lambda x: json.loads(x)))
# 固定窗口持续时间为1小时
fixed_windows = (
messages
| "add timestamp to messages" >> beam.ParDo(AddTimestampFn())
| "Create Fixed Windows with Hourly duration" >> beam.WindowInto(
beam.window.FixedWindows(window_duration_in_seconds))
)
# 进行计数操作
message_count = (
fixed_windows
| "Extract Pub/Sub message ids" >> beam.Map(lambda message: message.get("message_id"))
| "Count messages" >> beam.combiners.Count.PerElement())
# 将计数结果写入云存储
message_count | "Write results to GCS" >> WriteToText(output_path)
在以上示例代码中,“AddTimestampFn”是一个自定义的 DoFn 函数,它的作用是为每条消息添加时间戳信息,以便进行窗口操作中的时间约束。最后计数结果将被写入云存储中。