Apache Beam支持将单个PCollection划分为多个PCollection,并在管道中的多个位置使用这些PCollection。但是,有时候我们需要在管道中的某个位置将元素发射到多个不同的PCollection中。这时候可能会遇到问题。
在现有版本的Apache Beam中,我们可以使用一个自定义的转换器,称为MultiDoFn,来支持将元素发射到多个不同的PCollection中。MultiDoFn接受一个包含多个DoFn实例的列表,每个DoFn实例发射到不同的输出PCollection。
下面是一个示例,说明如何使用MultiDoFn来将PCollection中的汽车对象分成两个不同的输出PCollection,一个PCollection包含旧车,另一个PCollection包含新车。
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MultiDoFn;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
class Car {
public String model;
public int year;
public Car(String model, int year) {
this.model = model;
this.year = year;
}
}
class CarSplitter extends MultiDoFn {
TupleTag oldCars = new TupleTag("old_cars"){};
TupleTag newCars = new TupleTag("new_cars"){};
@Override
public TupleTagList getAdditionalOutputTags() {
return TupleTagList.of(oldCars).and(newCars);
}
@Override
public void processElement(ProcessContext c) {
Car car = c.element();
if (car.year < 2000) {
c.output(oldCars, car);
} else {
c.output(newCars, car);
}
}
}
在代码中,我们首先定义一个Car类,然后定义一个CarSplitter类,实现了MultiDoFn接口。在这个类中,我们定义