在Airflow中,您可以使用ExternalTaskSensor
传感器来在另一个传感器任务中使用传感器。下面是一个示例代码,演示如何在一个任务中等待另一个任务完成:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'sensor_example',
default_args=default_args,
schedule_interval='@daily',
)
task1 = DummyOperator(task_id='task1', dag=dag)
# 定义一个传感器任务,依赖于task1完成
task2 = ExternalTaskSensor(
task_id='task2',
external_dag_id='sensor_example',
external_task_id='task1',
mode='reschedule',
poke_interval=60, # 每60秒检查一次task1是否完成
timeout=3600, # 等待task1最长1小时
dag=dag,
)
task3 = DummyOperator(task_id='task3', dag=dag)
task1 >> task2 >> task3
在上面的代码中,有三个任务:
task1
是一个虚拟任务(使用DummyOperator
),它在DAG中起到一个起始点的作用。task2
是一个传感器任务(使用ExternalTaskSensor
),它依赖于task1
任务的完成。external_dag_id
参数指定了要等待的DAG ID,external_task_id
参数指定了要等待的任务 ID。task3
是一个虚拟任务,它在task2
完成后执行。在这个例子中,task2
将等待task1
完成,然后才会继续执行。传感器的mode
参数设置为reschedule
,这意味着如果task1
没有完成,传感器将会定期重新调度自己以等待任务完成。poke_interval
参数设置为60秒,表示每60秒检查一次任务的状态。timeout
参数设置为3600秒,表示最长等待时间为1小时。
请根据您的实际需求调整传感器的超时时间和检查间隔。