Airflow是一个开源的任务调度和工作流管理平台,可以使用Python编写任务的调度逻辑。下面是一个示例代码,演示如何使用Airflow来调度DAG(有向无环图)。
首先,需要安装Airflow。可以使用以下命令安装Airflow:
pip install apache-airflow
接下来,在Python脚本中导入Airflow的相关模块:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
然后,创建一个DAG对象,定义任务的调度逻辑:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag',
default_args=default_args,
description='An example DAG',
schedule_interval=timedelta(days=1),
)
在上面的代码中,我们定义了一个名为example_dag
的DAG,它会每天执行一次。
接下来,我们可以定义具体的任务(操作符),例如一个BashOperator任务,该任务执行一个Shell命令:
task1 = BashOperator(
task_id='task1',
bash_command='echo "Hello World"',
dag=dag,
)
在上面的代码中,我们定义了一个名为task1
的任务,它执行一个echo "Hello World"
的Shell命令。
最后,我们可以定义任务之间的依赖关系,即前一个任务执行完后才能执行下一个任务:
task2 = BashOperator(
task_id='task2',
bash_command='echo "Task 2"',
dag=dag,
)
task2.set_upstream(task1)
在上面的代码中,我们定义了一个名为task2
的任务,它会在task1
任务执行完后执行。通过set_upstream
方法来设置依赖关系。
最后,将DAG保存并启动Airflow的调度程序:
dag.save()
if __name__ == "__main__":
dag.cli()
以上代码仅仅是一个简单的示例,你可以根据实际需求来定义更复杂的DAG调度逻辑。
希望以上示例能够帮助你理解如何使用Airflow进行DAG调度。