一种可能的解决方法是在 DAG
定义中添加一个额外的参数 max_active_runs
,限制同时运行的 DAG instance 数量。这可以确保当一个 DAG 还在运行时,不会间隔几秒钟就启动一个新的 instance。
示例代码如下:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_dag',
default_args=default_args,
description='My DAG',
schedule_interval=timedelta(minutes=30),
max_active_runs=1
)
def my_task():
# Add your task logic here
pass
task = PythonOperator(
task_id='my_task',
python_callable=my_task,
dag=dag
)
在以上示例代码中,我们将 max_active_runs
参数设置为 1
,表示同时最多只有一个 my_dag
instance 在运行。这样可以避免并行运行的 DAG instance 在超出预期的 schedule_interval 后仍然被触发的问题。