当使用无界侧输入处理数据流时,Beam 可能会在等待无限数据源(如 Pub/Sub)时被阻塞,因为此类数据源可能不会发出终止信号。为解决此问题,可以使用全局窗口和 trigger
来等待足够的数据后输出结果。以下是一个示例:
from apache_beam import ParDo, DoFn, window
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode, AfterProcessingTime
class MyDoFn(DoFn):
def __init__(self):
self.side_input = []
def process(self, element, side_input):
# 使用 side_input 处理数据行为
...
def process_side_input(self, element):
# 处理无界侧输入
...
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
# 从 Pub/Sub 主题中读取消息
pubsub_messages = p | ReadFromPubSub(subscription='projects/project-id/subscriptions/subscription-id')
# 使用全局窗口并添加触发器等待足够的数据后输出结果
results = (pubsub_messages
| "Assign to Global Window" >> beam.WindowInto(window.GlobalWindows())
| "Parse Data and Process" >> beam.ParDo(MyDoFn(),
side_input=beam.pvalue.AsList(self.side_input))
.with_output_types(str))
# 设置触发器
trigger = AfterWatermark(
early=AfterProcessingTime(processing_time=120),
late=AfterProcessingTime(processing_time=240),
accumulation_mode=AccumulationMode.DISCARDING)
results = (results | beam.WindowInto(window.GlobalWindows(),
trigger=trigger,
accumulation_mode=AccumulationMode.DISCARDING))
# 将处理结果写入 Pub/Sub 主题
results | beam.io.WriteToPubSub('projects/project-id/topics/topic-id')
在上面的示