在Airflow 2中,我们可以使用Python装饰器来动态地创建任务。这种方法可以让我们根据参数的不同创建不同的任务。
代码示例:
from airflow.decorators import dag, task
from datetime import datetime
@task
def print_hello(name):
return f"Hello, {name}!"
@dag(default_args={'owner': 'airflow'})
def dynamic_dag():
for i in range(1, 6):
task_name = f"task_{i}"
task_args = {"name": f"user_{i}"}
# 使用task装饰器动态地创建任务
globals()[task_name] = task(print_hello, task_id=task_name, op_kwargs=task_args)
在上面的示例代码中,我们首先导入了所需的Airflow库和datetime库。然后,我们使用@task
装饰器来创建一个名为print_hello
的任务,该任务接受一个名为name
的参数并返回一条字符串。接下来,我们使用@dag
装饰器来创建一个名为dynamic_dag
的DAG,该DAG具有默认的Airflow参数。在此DAG中,我们使用Python的range
函数创建了一个循环,该循环创建了五个任务并赋予了不同的名称和参数。我们在每次循环中使用globals()
函数动态地创建了任务,并使用print_hello
任务初始化了它们。最后,我们在任务创建后可以像往常一样使用它们。通过这种方式,我们可以轻松地创建多个具有不同参数的任务。