检查DAG的schedule_interval是否被正确地设置。如果schedule_interval太短,可能会导致DAG的运行数量超出了Airflow的默认限制。可以尝试将schedule_interval调高一些。
检查DAG所依赖的传感器或任务是否成功完成,并且在Airflow中已被标记为已完成。如果存在失败的依赖关系,则DAG将处于等待状态,并且不会启动。
如果DAG已被标记为已完成,则可以从Airflow的数据库中删除对应的记录,以便重新执行。可以通过以下代码实现:
from airflow import models, settings
from airflow.models import DagRun
dag_id = 'dag_id_to_delete'
session = settings.Session()
dag_runs = session.query(DagRun).filter(DagRun.dag_id == dag_id).all()
for dag_run in dag_runs:
session.delete(dag_run)
session.commit()
如果上述方法都无法解决问题,则可以考虑重建Airflow的元数据数据库,以确保数据库没有损坏。可以按以下步骤操作:
停止所有Airflow进程
删除Airflow数据库:rm -rf /path/to/airflowdb
重新初始化Airflow数据库:airflow initdb
启动Airflow daemon:airflow webserver -p 8080 & airflow scheduler
重新创建DAG并测试。