在使用 Airflow 时,任务之间的依赖关系对于工作流的正确执行至关重要,因此很少情况下不需要定义任务之间的依赖性。 如果需要按照某个先后顺序执行任务,可以按照以下的方式进行设置:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('sequential_task_execution',
default_args=default_args,
schedule_interval='@once')
def task1():
print("Executing task 1")
def task2():
print("Executing task 2")
def task3():
print("Executing task 3")
t1 = PythonOperator(
task_id='task1',
python_callable=task1,
dag=dag
)
t2 = PythonOperator(
task_id='task2',
python_callable=task2,
dag=dag
)
t3 = PythonOperator(
task_id='task3',
python_callable=task3,
dag=dag
)
# 设置任务执行的顺序
t1 >> t2 >> t3
然后在 DAG 中定义任务,使用 PythonOperator
定义每个任务的执行内容,然后按照所需的执行顺序使用位移运算符( >>
)来连接任务。 这样就实现了按照顺序执行任务的功能。