Apache Beam 提供了 Splittable DoFn Streaming API,它允许用户创建可拆分的并行处理函数,并在流式数据处理步骤中使用它们。
以下是一个使用 Splittable DoFn 的示例代码:
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
public class MyDoFn extends DoFn {
@ProcessElement
public void processElement(ProcessContext c, RestrictionTracker tracker) {
OffsetRange range = tracker.currentRestriction();
for (long i = range.getFrom(); tracker.tryClaim(i) && i < range.getTo(); ++i) {
// Process input element with index i.
c.output("Processed element " + i);
}
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(String element) {
// Split the global range into subranges and assign one to each worker.
return new OffsetRange(0L, 100L);
}
@SplitRestriction
public SplitResult splitRestriction(
String element, OffsetRange restriction) {
// Split the given range into sub-ranges that can be processed independently.
long midpoint = (restriction.getFrom() + restriction.getTo()) / 2;
return SplitResult.of(
new OffsetRange(restriction.getFrom(), midpoint),
new OffsetRange(midpoint, restriction.getTo()));
}
@NewTracker
public RestrictionTracker newTracker(OffsetRange range) {
// Return a new RestrictionTracker suitable for processing a subrange of the input
// based on the given initial restriction.
return new MyRestrictionTracker(range);
}
}
此代码定义了一个可拆分的 DoFn,它从输入字符串流中处理元素,并输出处理后的字符串流。
我们可以使用此 DoFn 在 Apache Beam 流水线中进行流式处理,如下所示:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply("ReadFromSource", TextIO.read().from("inputFile.txt"))
.apply(ParDo.of(new MyDoFn()))
.apply("WriteToSink", TextIO.write().to("outputFile.txt"));
p.run();
此代码从输入文件中读取字符串流