要实现按键分组数据的功能,可以使用Apache Beam的GroupByKey操作。下面是一个示例代码:
import apache_beam as beam
# 创建一个Pipeline对象
p = beam.Pipeline()
# 生成一些示例数据
data = [
('apple', 1),
('banana', 2),
('apple', 3),
('banana', 4),
('orange', 5)
]
# 将数据转换为PCollection对象
input_data = p | 'Create input data' >> beam.Create(data)
# 使用GroupByKey操作按键分组数据
grouped_data = input_data | 'Group by key' >> beam.GroupByKey()
# 打印每个键和对应的值列表
grouped_data | 'Print grouped data' >> beam.Map(print)
# 运行Pipeline
p.run()
上述代码首先创建了一个Apache Beam的Pipeline对象。然后,使用beam.Create
方法生成了一些示例数据,并将其转换为PCollection对象。接下来,使用beam.GroupByKey
操作对数据进行按键分组。最后,通过beam.Map
操作将分组后的数据打印出来。
运行上述代码,将会输出以下结果:
('apple', [1, 3])
('banana', [2, 4])
('orange', [5])
可以看到,数据根据键进行了分组,并且每个键对应一个值列表。