要使用Airflow创建DAG,可以使用Airflow提供的Python API或函数。
首先,需要导入相关的模块和类:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
然后,可以使用DAG类来创建一个DAG对象,指定DAG的参数,如开始日期、调度间隔等:
dag = DAG(
'my_dag',
description='My first DAG',
schedule_interval='0 0 * * *',
start_date=datetime(2022, 1, 1),
catchup=False
)
接下来,可以使用Operator类来创建任务节点,比如使用DummyOperator类创建一个虚拟任务节点:
task1 = DummyOperator(
task_id='task1',
dag=dag
)
然后可以通过设置任务之间的依赖关系来构建DAG的结构:
task2 = DummyOperator(
task_id='task2',
dag=dag
)
task1 >> task2
最后,返回DAG对象:
return dag
下面是完整的示例代码:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def create_dag():
dag = DAG(
'my_dag',
description='My first DAG',
schedule_interval='0 0 * * *',
start_date=datetime(2022, 1, 1),
catchup=False
)
task1 = DummyOperator(
task_id='task1',
dag=dag
)
task2 = DummyOperator(
task_id='task2',
dag=dag
)
task1 >> task2
return dag
可以将上述代码保存为一个Python文件,然后在Airflow中使用 import
语句导入该函数,并调用它来创建DAG对象。