Apache Beam的Combine操作可以对数据流的每个键值对进行聚合操作。如果需要使用组合键(由多个键构成的组合),则需要使用多个键值对作为输入。代码示例如下:
import apache_beam as beam
# 定义多键PTransform
class MultiKeys(beam.DoFn):
def process(self, element):
# 根据需要的键构建组合键
composite_key = (element['key1'], element['key2'])
# 返回组合键和对应的值
yield (composite_key, element['value'])
# 创建PCollection并使用Combine操作
with beam.Pipeline() as pipeline:
data = pipeline | beam.Create([
{'key1': 'a', 'key2': 'x', 'value': 1},
{'key1': 'b', 'key2': 'y', 'value': 2},
{'key1': 'a', 'key2': 'y', 'value': 3},
{'key1': 'a', 'key2': 'x', 'value': 4},
])
result = data | beam.ParDo(MultiKeys()) | beam.CombinePerKey(sum)
result | beam.Map(print)
输出为:
(('a', 'x'), 5)
(('a', 'y'), 3)
(('b', 'y'), 2)
在本示例中,使用MultiKeys DoFn将多个键合并为一组合键,并通过CombinePerKey操作对每个组合键的所有值进行求和操作。最后,使用Map操作打印结果。