如果您使用的是Airflow 2.0.2版本,则无法使用task_group功能。相反,您可以使用TaskFlow API
来组织您的DAG任务。
下面是一个示例代码,对应着通过TaskFlow API来定义DAG任务的方法:
from airflow.decorators import dag, task
from airflow.utils.task_group import TaskGroup
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'])
def example_taskflow_api():
with TaskGroup('group1') as group1:
task1 = first_task()
task2 = second_task()
task3 = third_task()
task1 >> task2 >> task3
with TaskGroup('group2') as group2:
task4 = fourth_task()
task5 = fifth_task()
task6 = sixth_task()
task4 >> task5 >> task6
task7 = seventh_task()
group1 >> task7
group2 >> task7
dag = example_taskflow_api()
在上述代码中,使用了TaskFlow API中的TaskGroup
来创建具有内部依赖关系的任务组。在每个TaskGroup
中,创建了三个任务,并通过>>
符号定义它们的依赖关系。在TaskFlow API中,这些组中的任务是可以按任意顺序执行的。在这个例子中,我们最终将两个TaskGroup
中的任务全部依赖于一个最终任务task7
。
值得注意的是,使用TaskFlow API创建的DAG不能包含循环性依赖。