通常情况下,PipelineOptions对象不能被序列化,因此使用它的任何类都不能在DoFn的setup()/process()/finish()方法中声明为成员变量或参数。 解决此问题的最佳方法是:在创建Pipeline时,将PipelineOptions传递给PipelineOptionsFactory.create()方法,并将返回的Options对象传递给DoFn。 以下是可能导致此问题的示例代码:
public class MyDoFn extends DoFn {
private final MyOptions options; // MyOptions extends PipelineOptions
public MyDoFn(MyOptions options) { // constructor with MyOptions
this.options = options;
}
@ProcessElement
public void processElement(ProcessContext c) {
// use this.options ...
}
}
可以通过将构造函数参数更改为PipelineOptions以避免此问题:
public class MyDoFn extends DoFn {
private final transient PipelineOptions options;
public MyDoFn(PipelineOptions options) { // constructor with PipelineOptions
this.options = options;
}
@ProcessElement
public void processElement(ProcessContext c) {
// use this.options ...
}
}