可以在Airflow DAG中设置参数max_active_runs和retries来控制任务的并发和重试次数。同时,可以使用Operator的set_upstream和set_downstream方法来控制任务之间的依赖关系,从而优化任务的执行顺序和并发度。代码示例如下:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
'max_active_runs': 1
}
dag = DAG(
'example_dag',
default_args=default_args,
catchup=False,
schedule_interval=None
)
def long_running_task():
# 长时间运行的任务
pass
with dag:
task_1 = PythonOperator(
task_id='task_1',
python_callable=long_running_task
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=long_running_task
)
task_3 = PythonOperator(
task_id='task_3',
python_callable=long_running_task
)
task_1 >> task_2
task_1 >> task_3