可以尝试指定 Dataflow 的 Google Cloud Platform 项目 ID 和区域以解决该问题。示例如下:
from airflow.contrib.operators.dataflow_operator import DataflowPythonOperator
dataflow_task = DataflowPythonOperator(
task_id='my_dataflow_task',
py_file='/path/to/my/python/pipeline.py',
gcp_conn_id='my_gcp_connection',
project='my_gcp_project_id',
region='my_gcp_region',
dataflow_default_options={'project': 'my_gcp_project_id', 'region': 'my_gcp_region'}
)
其中,my_gcp_project_id
是你的 Google Cloud Platform 项目 ID,my_gcp_region
是你的 Dataflow 区域。还需要确保你的 Google Cloud Platform 凭据已经配置并授权给了 Airflow。