为解决Airflow DAG在时间消耗长的运行时可能会阻塞其他任务的问题,可以使用DAG的trigger_rule属性。例如,将"all_success"属性设置为trigger_rule,可以确保只有在所有父任务成功完成后,DAG才会被触发。
代码示例:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }
dag = DAG( 'example_dag', default_args=default_args, description='An example DAG with long-running tasks', schedule_interval=timedelta(days=1), )
task1 = BashOperator( task_id='task_1', bash_command='sleep 60', dag=dag, )
task2 = BashOperator( task_id='task_2', bash_command='echo "Hello, Airflow!" && sleep 30', dag=dag, )
task3 = BashOperator( task_id='task_3', bash_command='sleep 120', dag=dag, )
task1 >> task2 task1 >> task3 task2 >> task3
task3.trigger_rule = 'all_success'
上一篇:AirflowDAG触发API时出现403Forbidden错误。
下一篇:AirflowDAG错误commandfailed.Thecommandreturnedanon-zeroexitcodeinairflow