在 Airflow 中,max_active_runs 属性用于控制正在运行的并发任务数的最大数量。默认情况下,此属性设置为16。如果在 DAG 中定义了多个任务,则可以使用此属性来限制同一时间运行的任务数量。
如果发现 max_active_runs 属性没有增加,可能是由于以下原因:
airflow clear -l DAG_ID && airflow scheduler
以下是一个简单的 DAG 定义,它使用 max_active_runs 属性控制同一时间运行的任务数量:
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5) }
dag = DAG('example_dag', default_args=default_args, schedule_interval=timedelta(days=1), max_active_runs=5)
t1 = BashOperator( task_id='task_1', bash_command='echo "Hello World!"', dag=dag )
t2 = BashOperator( task_id='task_2', bash_command='echo "Hello World!"', dag=dag )
t3 = BashOperator( task_id='task_3', bash_command='echo "Hello World!"', dag=dag )
t4 = BashOperator( task_id='task_4', bash_command='echo "Hello World!"', dag=dag )
t5 = BashOperator( task_id='task_5', bash_command='echo "Hello World!"', dag=dag )
t1 >> [t2, t3, t4, t5]