可以使用BranchPythonOperator和TriggerRule来实现该功能。下面是示例代码:
from airflow.operators.python_operator import BranchPythonOperator
def decide_which_path_to_take(**kwargs):
upstream_task_ids = ["task1", "task2", "task3"]
upstream_status = [kwargs["ti"].xcom_pull(task_ids=i) for i in upstream_task_ids]
if all(status == "success" for status in upstream_status):
return "success_path"
else:
return "failure_path"
branch_op = BranchPythonOperator(
task_id='task_branch',
python_callable=decide_which_path_to_take,
provide_context=True,
trigger_rule='all_success'
)
在上述代码中,我们首先定义了一个Python函数decide_which_path_to_take,它将获取指定的上游任务ID的状态,如果它们全部都成功,则返回"success_path",否则返回"failure_path"。
然后我们使用BranchPythonOperator将该函数封装成一个任务,并将其设置为“all_success”触发规则,这意味着只有当所有上游任务都成功完成时才会运行该任务。