以下是一个使用Airflow外部任务传感器的代码示例:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
# 定义DAG的默认参数
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
# 定义DAG
dag = DAG(
'external_task_sensor_example',
default_args=default_args,
description='An example DAG with external task sensor',
schedule_interval=timedelta(days=1),
)
# 定义一个外部任务传感器,用于等待外部任务完成
task_sensor = ExternalTaskSensor(
task_id='wait_for_external_task',
external_dag_id='external_dag_id', # 外部DAG的ID
external_task_id='external_task_id', # 外部任务的ID
timeout=3600, # 超时时间(以秒为单位)
dag=dag,
)
# 定义一个PythonOperator任务,用于执行其他操作
def process_data():
# 在这里执行其他操作,例如处理数据或运行其他任务
print("Processing data...")
process_data_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
dag=dag,
)
# 设置任务之间的依赖关系
task_sensor >> process_data_task
在上面的代码中,我们首先导入所需的模块和类。然后,我们定义了DAG的默认参数,包括所有任务共享的一些配置。接下来,我们定义了一个DAG对象,并指定了DAG的名称、默认参数、描述和调度间隔。
然后,我们定义了一个外部任务传感器(ExternalTaskSensor),用于等待外部任务完成。我们需要指定外部DAG的ID和外部任务的ID,以及超时时间。外部任务传感器将等待指定的外部任务完成,然后才会继续执行后续任务。
接下来,我们定义了一个PythonOperator任务,用于执行其他操作。在这个示例中,我们只是打印一条消息,但你可以在这里执行任何其他操作,例如处理数据或运行其他任务。
最后,我们设置了任务之间的依赖关系,将外部任务传感器的输出连接到PythonOperator任务。这意味着当外部任务传感器等待的外部任务完成后,才会执行PythonOperator任务。
请根据你的实际需求修改代码中的外部DAG和任务的ID,并根据需要添加其他任务和依赖关系。