在Airflow中实现同一计划和不同计划的多任务
在Airflow中,可以使用DAG(有向无环图)和Operator来实现同一计划和不同计划的多任务。DAG是任务之间的依赖关系图,其中包含多个操作符或任务。在每个DAG中,可以定义多个操作符,每个操作符代表一个任务。然后,使用调度器将DAG图中的任务分配到可用的执行器中。示例代码:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def task1(params):
# do something
return
def task2(params):
# do something
return
def task3(params):
# do something
return
# 定义DAG
dag = DAG(
'my_dag',
description='my_dag',
start_date=datetime(2018, 12, 31),
schedule_interval='0 0 * * *',
default_args={
'owner': 'my_company',
'depends_on_past': False,
'start_date': datetime(2018, 12, 31),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
)
# 定义task1
task_1 = PythonOperator(
task_id='task_1',
provide_context=True,
python_callable=task1,
params={'param1': 1, 'param2': 2},
dag=dag,
)
# 定义task2
task_2 = PythonOperator(
task_id='task_2',
provide_context=True,
python_callable=task2,
params={'param3': 3, 'param4': 4},
dag=dag,
)
# 定义task3
task_3 = PythonOperator(
task_id='task_3',
provide_context=True,
python_callable=task3,
params={'param5': 5, 'param6': 6},
dag=dag,
)
task_1 >> task_2 >> task_3 # 定义task执行顺序
以上代码中,定义了3个PythonOperator并设置不同的task_id,每个PythonOperator都代表了一个任务,可以在其中定义需要执行的Python函数,并通过provide_context=True来获取当前任务的上下文。同时,DAG也定义了任务的执行顺序,其中task1先执行,然后执行task2,最后执行task3。在DAG中,一些关键的参数,如start_date、schedule_interval、retries等,也被设置在default_args中。