首先需要安装Apache Airflow,可以通过以下命令安装:
pip install apache-airflow
在Airflow中,DAG是指一组任务。可以使用Python代码创建DAG。以下代码演示了创建一个DAG以运行三个任务的方法:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'MyDAG',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
task1 = BashOperator(
task_id='task1_exec',
bash_command='echo "Hello World"',
dag=dag,
)
task2 = BashOperator(
task_id='task2_exec',
bash_command='echo "Hello World"',
dag=dag,
)
task3 = BashOperator(
task_id='task3_exec',
bash_command='echo "Hello World"',
dag=dag,
)
task1.set_downstream([task2, task3])
在这个DAG中,任务1和任务2是同时进行的,并且任务3在任务1和任务2完成后执行。使用set_downstream方法设置任务的依赖关系。
可以使用Airflow的TaskGroup来创建任务组。以下代码演示了如何将任务2和任务3分组,并设置任务组的依赖关系:
from airflow.utils.task_group import TaskGroup
with TaskGroup("MyTaskGroup") as task_group:
task2 = BashOperator(
task_id="task2",
bash_command="echo 'Hello World from task 2'",
dag=dag
)
task3 = BashOperator(
task_id="task3",
bash