在Airflow中,推荐的方式是通过创建一个Python模块来定义和配置DAG。以下是一个包含代码示例的解决方法:
创建一个名为dags的文件夹,并在该文件夹中创建一个Python模块(例如my_dag.py)。
在my_dag.py文件中,引入必要的模块和类:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('my_dag', default_args=default_args, schedule_interval='0 0 * * *')
在上面的代码中,我们定义了一个名为my_dag的DAG,指定了默认参数(default_args),包括所有者(owner)、开始日期(start_date)、重试次数(retries)和重试延迟(retry_delay)。我们还指定了调度间隔(schedule_interval),在这个例子中,我们将DAG设置为每天的午夜执行一次。
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello, Airflow!"',
dag=dag
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "Hello again, Airflow!"',
dag=dag
)
task1 >> task2
在上面的代码中,我们定义了两个任务(task1和task2),它们分别使用BashOperator执行一个简单的bash命令。我们使用task_id参数为每个任务指定唯一的标识符,并将它们关联到DAG对象(dag=dag)。最后,我们使用>>操作符将任务1的输出连接到任务2的输入。
保存并关闭my_dag.py文件。
在Airflow的配置文件中,将dags_folder设置为刚刚创建的dags文件夹的路径。
启动Airflow的Web服务器和调度程序。
在Airflow的用户界面中,您应该能够看到名为my_dag的DAG,并能够触发它的执行。
这是一个最简单的示例,您可以根据需要添加更多的任务和操作符。