在Airflow中,可以使用ExternalTaskSensor来等待外部任务完成。当外部任务失败时,ExternalTaskSensor默认会将任务标记为失败并中止工作流程。然而,我们可以使用参数soft_fail=True
来指示ExternalTaskSensor在外部任务失败时不会导致任务本身失败。
下面是一个使用ExternalTaskSensor的示例代码,其中任务task1
等待任务task2
完成:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('example_dag', default_args=default_args, schedule_interval=None)
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)
task1 >> task2
# 在task3之前使用ExternalTaskSensor等待task2完成
task3_sensor = ExternalTaskSensor(
task_id='task3_sensor',
external_dag_id='example_dag',
external_task_id='task2',
mode='reschedule',
soft_fail=True, # 外部任务失败时不会导致task3_sensor失败
poke_interval=60, # 每60秒检查一次任务状态
timeout=3600, # 如果任务在1小时内没有完成,则超时
dag=dag
)
task3_sensor >> task3
在上面的示例中,task3_sensor
使用ExternalTaskSensor来等待任务task2
完成。当task2
失败时,task3_sensor
不会失败,而是继续等待。这样可以确保工作流程不会中止,即使外部任务失败。
上一篇:Airflow的ExternalTaskSensor卡住了
下一篇:AirflowDeferredOperator:toomanyvaluestounpack(expected2).Debuggingtrigger