在Airflow2中,可以使用PythonOperator来动态创建DAG。下面是一个示例代码,它将在任务运行时创建一个DAG并在任务完成后运行它。
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def create_dag(dag_id, schedule, default_args):
dag = DAG(dag_id,
default_args=default_args,
schedule_interval=schedule)
def print_hello():
print('hello')
with dag:
task = PythonOperator(task_id='print_hello',
python_callable=print_hello)
return dag
default_args = {
'start_date': datetime(2021, 8, 1),
}
dag_params = [
{'dag_id': 'dynamic_dag_1', 'schedule': '*/5 * * * *'},
{'dag_id': 'dynamic_dag_2', 'schedule': '0 0 * * MON'}
]
for params in dag_params:
dag_id = params['dag_id']
schedule = params['schedule']
dag = create_dag(dag_id, schedule, default_args)
globals()[dag_id] = dag
在此示例中,我们使用循环来遍历要创建的DAG的参数列表。然后,我们调用一个名为create_dag的函数来创建DAG对象。这个函数使用DAG构造函数和PythonOperator来创建一个任务。最后,我们使用Python的globals()函数将DAG对象添加到全局变量中,以便Airflow可以正确识别它们。
当任务运行时,它会调用create_dag函数,并动态地创建和运行DAG。当DAG运行完成后,我们可以在Airflow UI中看到它的状态和运行日志。