以下是使用Apache Beam按键对所有窗口的PCollection
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.joda.time.Duration;
public class SumByKeyInAllWindows {
public static void main(String[] args) {
// 创建Pipeline
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
// 创建输入数据集
PCollection> input = pipeline
.apply("Generate input", Create.of(
KV.of("key1", 1),
KV.of("key2", 2),
KV.of("key1", 3),
KV.of("key2", 4),
KV.of("key2", 5)
));
// 应用窗口
PCollection> sumPerKey = input
.apply("Apply fixed windows", Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply("Sum per key", Sum.perKey());
// 输出结果
sumPerKey.apply("Print results", ParDo.of(new DoFn, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}));
// 运行Pipeline
pipeline.run().waitUntilFinish();
}
}
在此示例中,我们首先创建一个输入PCollection
然后,我们应用了一个Fixed Windows,将输入数据按1分钟的时间窗口进行分组。
接下来,我们使用Sum.perKey()转换对每个键的值进行求和。
最后,我们将结果打印出来,通过ParDo转换应用一个DoFn来处理每个元素。
最后,我们运行Pipeline并等待其完成。