可以通过设置参数"trigger_rule"为"all_done"和"one_failed"来解决此问题。"all_done"表示只有在所有上游任务均成功执行后才会触发下游任务,而"one_failed"表示只要有一个上游任务失败或跳过,就会触发下游任务。
示例代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
with TaskGroup('group_a', trigger_rule='all_done', dag=dag) as group_a:
task_a = BashOperator(
task_id='task_a',
bash_command='echo "Task A"',
dag=dag,
)
task_c = BashOperator(
task_id='task_c',
bash_command='echo "Task C"',
dag=dag,
)
with TaskGroup('group_b', trigger_rule='one_failed', dag=dag) as group_b:
task_b = BashOperator(
task_id='task_b',
bash_command='echo "Task B"',
dag=dag,
)
task_d = BashOperator(
task_id='task_d',
bash_command='echo "Task D"',
dag=dag,
)
task_e = BashOperator(
task_id='task_e',
bash_command='echo "Task E"',
dag=dag,
)
task_a >> task_b >> [task_c, task_d] >> task_e