问题源于具有并行化和异步处理功能的Apache Beam框架可能无法保证数据流在目标文件中以正确的顺序写入。为了解决这个问题,可以使用有序写入器(OrderedWriter)来确保记录按正确的顺序写入目标文件。以下是一个Java代码示例,演示如何使用有序写入器来解决此问题:
PCollection records = ...; // input PCollection
PCollectionView filenamesView = ...; // PCollectionView of the filenames to write to
final TupleTag doneTag = new TupleTag<>();
//Create a new PCollection by assigning a unique, increasing ID to each element
PCollection> keyedRecords =
records.apply("AddUniqueIds", WithKeys.of((Void) null)).setCoder(KvCoder.of(VarIntCoder.of(), MyRecordCoder.of()));
//Group all elements assigned with the same key (null), and sort all records by id.
PCollection sortedRecords =
PCollectionList.of(keyedRecords.apply(GroupByKey.create()))
.apply(ParDo.of(new DoFn>, MyRecord>() {
@ProcessElement
public void processElement(ProcessContext context) {
List sorted = new ArrayList<>();
for (MyRecord r : context.element().getValue()) {
sorted.add(r);
}
Collections.sort(sorted, new Comparator() {
@Override
public int compare(MyRecord o1, MyRecord o2) {
// Assumes that MyRecord has a method that returns its id as an int.
return Long.compare(o1.getId(), o2.getId());
}
});
for (MyRecord r : sorted) {
context.output(r);
}
}
})).setCoder(MyRecordCoder.of());
// Create a new representation of the input PCollection where each element is a tuple containing
// the filename and the record it belongs to.
// For example, if the input file was records 0, 1, 2, 3, 4, 5, 6, the output would be:
// ("file0", record 0), ("file1", record 1), ("file2", record 2), ("file3", record 3), ...
final PCollection> keyedOutput = sortedRecords
.apply("AssignFilename", ParDo.of(new DoFn>() {
@ProcessElement
public void processElement(ProcessContext context) {
List filenames = context.sideInput