在Airflow DAG中,可以使用PythonOperator
和BranchPythonOperator
来创建任务依赖关系。PythonOperator
用于创建普通的任务,BranchPythonOperator
用于创建具有条件的任务。
下面是一个示例,演示了如何在Airflow DAG中创建任务依赖关系:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from datetime import datetime
default_args = {
'start_date': datetime(2021, 1, 1)
}
def task_1():
print("Task 1")
def task_2():
print("Task 2")
def task_3():
print("Task 3")
def condition():
# 根据条件返回下一个任务的名称
return 'task_2' if condition_met else 'task_3'
with DAG('dag_with_dependencies', default_args=default_args, schedule_interval=None) as dag:
t1 = PythonOperator(task_id='task_1', python_callable=task_1)
t2 = PythonOperator(task_id='task_2', python_callable=task_2)
t3 = PythonOperator(task_id='task_3', python_callable=task_3)
branch = BranchPythonOperator(task_id='branch_task', python_callable=condition)
t1 >> branch
branch >> t2
branch >> t3
在上面的示例中,定义了三个任务task_1
,task_2
和task_3
,以及一个条件函数condition
。使用PythonOperator
创建了这三个任务,并使用BranchPythonOperator
创建了一个条件分支任务branch_task
,根据condition
函数的返回值来确定下一个任务是task_2
还是task_3
。
通过>>
操作符来定义了任务之间的依赖关系,t1 >> branch
表示任务task_1
的输出将作为branch_task
的输入,branch >> t2
表示branch_task
的输出将作为task_2
的输入,branch >> t3
表示branch_task
的输出将作为task_3
的输入。
这样就创建了一个DAG,其中任务之间有依赖关系,并且根据条件不同,执行不同的任务。