在Airflow中,使用wait_for_completion参数来定义当DAG的运行被触发时,是否等待当前运行中的任务完成后再开始新的运行。然而,有时候wait_for_completion参数并没有按预期生效,导致新的运行被触发后旧的任务仍然在运行中。
这个问题的解决方法是使用两个参数:dag_concurrency和max_active_runs。这两个参数的作用是限制同时运行的DAG数量和单个DAG执行的并行任务数,以确保每个DAG运行期间始终有足够的任务资源。
以下是一些示例代码,展示如何使用这两个参数来限制DAG并发项的数量:
from airflow.models import DAG
from datetime import datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 1),
'retries': 1,
}
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval='@once',
dagrun_timeout=timedelta(minutes=60),
concurrency=2,
max_active_runs=1,
)
# other code for tasks...
在上面的代码中,concurrency参数设置为2,限制了一次同时运行的DAG数量,max_active_runs参数设置为1,限制了一个DAG同时执行的并行任务数。
通过在您的DAG定义中使用这些参数,您可以确保wait_for_completion参数按照预期工作,并避免同时运行太多的DAG和任务。
下一篇:AirflowDAG未按计划执行