在调用BeamRunPythonPipelineOperator时,需要使用异步方式(async)并将wait_until_finished设置为False。例如:
from airflow.providers.google.cloud.operators.dataflow import BeamRunPythonPipelineAsyncOperator
run_options = {
"job_name": "my_job_name",
"project": "my-gcp-project",
"staging_location": "gs://my-bucket/staging",
"temp_location": "gs://my-bucket/temp",
}
pipeline_options = {
"runner": "DataflowRunner",
"project": "my-gcp-project",
"staging_location": "gs://my-bucket/staging",
"temp_location": "gs://my-bucket/temp",
}
run_python_pipeline_async_op = BeamRunPythonPipelineAsyncOperator(
task_id='run_my_beam_job',
py_file='/path/to/my/beam_job.py',
run_options=run_options,
pipeline_options=pipeline_options,
wait_until_finished=False
)
run_python_pipeline_async_op.execute(context=None)
此示例使用了BeamRunPythonPipelineAsyncOperator来代替BeamRunPythonPipelineOperator,其中wait_until_finished设置为False并通过execute方法执行异步任务,从而避免了Airflow的BeamRunPythonPipelineOperator无法遵守wait_until_finished选项的问题。