在Apache Beam中,要返回多个输出,可以使用多路输出(MultiOutput)。
首先,需要创建一个新的PCollectionTuple对象,将每个输出分别命名。然后,在DoFn的processElement方法中,根据需要将每个元素添加到正确的输出中。
以下是创建具有两个输出的示例DoFn的示例代码:
public class MultipleOutputsDoFn extends DoFn {
public static final TupleTag outputTag1 =
new TupleTag(){};
public static final TupleTag outputTag2 =
new TupleTag(){};
@ProcessElement
public void processElement(ProcessContext c) {
String element = c.element();
if (condition1) {
c.output(outputTag1, "output1_" + element);
}
if (condition2) {
c.output(outputTag2, "output2_" + element);
}
}
}
在这个例子中,我们定义了两个TupleTag,用于标识两个不同的输出。在processElement方法中,我们可以根据某些条件向不同的输出添加元素。
在主管道中,可以使用配对和Map方法来处理这些输出。以下是一个使用具有两个输出的DoFn的示例管道:
PCollection input = ...;
TupleTag outputTag1 = ...;
TupleTag outputTag2 = ...;
PCollectionTuple outputs = input.apply(ParDo.of(new MultipleOutputsDoFn())
.withOutputTags(outputTag1, TupleTagList.of(outputTag2)));
PCollection output1 = outputs.get(outputTag1);
PCollection output2 = outputs.get(outputTag2);
PCollection finalOutput =
PCollectionList.of(output1).and(output2).apply(Flatten.pCollections());
在这个例子中,我们使用withOutputTags方法将两个TupleTag与DoFn配对,然后将它们作为PCollectionTuple输出。然后,我们使用PCollectionTuple.get方法来获得每个输出的