在Airflow中,可以使用DAG(有向无环图)定义一组有关任务的工作流程。对于任务之间存在依赖关系的情况,Airflow允许在DAG中定义Task之间的依赖关系。如果需要在同时运行任务的同时,确保任务在某个给定任务之后按特定顺序运行,请使用set_upstream
和set_downstream
方法。
示例代码:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_dag',
default_args=default_args,
description='A simple DAG',
schedule_interval=timedelta(days=1),
)
t1 = BashOperator(
task_id='task_one',
bash_command='echo 1',
dag=dag,
)
t2 = BashOperator(
task_id='task_two',
bash_command='echo 2',
dag=dag,
)
t3 = BashOperator(
task_id='task_three',
bash_command='echo 3',
dag=dag,
)
t4 = BashOperator(
task_id='task_four',
bash_command='echo 4',
dag=dag,
)
t1.set_downstream(t2)
t2.set_downstream(t3)
t3.set_downstream(t4)
在此示例中,我们定义了一个包含四个任务的DAG,并使用set_downstream
方法定义了它们之间的依赖关系。t1
在t2
之前运行,t2
在t3
之前运行,t3
在t4
之前运行。由于每个任务都是独立运行的,