在Airflow中,设置外部任务传感器需要使用ExternalTaskSensor
。ExternalTaskSensor
允许DAG等待其他DAG的任务完成后继续执行。
为了设置不同的调度间隔,可以使用BaseSensorOperator
类,然后在其中调用poke
方法以检查外部任务是否已完成。以下是一个示例代码,演示如何设置调度间隔为5分钟的ExternalTaskSensor
。
from airflow import DAG
from airflow.operators.sensor import BaseSensorOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import datetime, timedelta
from airflow.operators.external_task_sensor import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
}
dag = DAG('my_sensor_dag', default_args=default_args, schedule_interval=timedelta(minutes=5))
wait_for_external_task = ExternalTaskSensor(
task_id='wait_for_external_task',
external_dag_id='other_dag_id',
external_task_id='other_task_id',
poke_interval=60, # 检查外部任务完成的频率为1分钟一次
timeout=60*60, # 等待外部任务的超时时间为1小时
dag=dag
)
do_something = DummyOperator(
task_id='do_something',
dag=dag
)
wait_for_external_task >> do_something
以上代码中,ExternalTaskSensor
的poke_interval
参数设置为60(即1分钟),这意味着外部任务每分钟会被检查一次。timeout
参数设置为3600(即1小时),这意味着如果外部任务在等待1小时后仍未完成,则DAG将自动中止。
此外,您还必须设置external_dag_id
和external_task_id
参数,以指定需要等待的外部任务。在这个示