Airflow 2.3支持使用装饰器来动态地映射任务。使用这种方法,可以根据 DAG 配置文件中的参数创建一个包含任务的列表(task_list),并将其传递到装饰器中运行时。
以下是一个示例 DAG,显示了如何使用装饰器来动态地映射任务:
from airflow.decorators import dag, task
from datetime import datetime
@dag(default_args=default_args, schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False)
def dynamic_task_mapping():
def create_task_list():
task_list = []
for i in range(3):
task_func = task_decorator(task_id=f"task_{i}", bash_command=f"echo {i}")
task_list.append(task_func)
return task_list
@task
def start_task():
return "Starting the DAG"
@task
def end_task():
return "Ending the DAG"
@dag_id.create_dagrun(run_id=DagRunType.MANUAL_TRIGGER, execution_date=datetime.now())
def dynamic_mapping_dag(task_list=create_task_list()):
start_task()
for task in task_list:
task()
end_task()
dynamic_mapping_dag()
在这个示例中,我们创建了一个名为 dynamic_task_mapping()
的 DAG,使用了默认参数和 datetime
库。我们也使用了 dag_id
对象来创建 DAG。我们还定义了三个任务:
start_task()
任务,它返回“Starting the DAG”。end_task()
任务,返回“Ending the DAG”。dynamic_mapping_dag()
任务,其中 task_list 是一个包含多个任务的列表。我们通过传入 task_list 的方式来动态地映射任务。我们使用 create_task_list()
函数来创建将添加到任务列表中的单个任务。每个任务包含 ID(task_id
)和要执行的命令(bash_command
)。
我们将 `task