可以添加以下代码行,以在 DAG 同步过程中清理旧的 DAG。
from airflow.models import DAG
from airflow import settings
# 检查所有 DAG 并删除过时的 DAG
for dag in DagBag(settings.DAGS_FOLDER).dags.values():
if dag.is_subdag:
continue
existing_dag = DagModel.get_current(dag.dag_id)
if existing_dag and existing_dag.is_active:
continue
dag.clear()
print(f"Cleared old DAG: {dag.dag_id}")
这段代码将在 DAG 同步过程中迭代所有 DAG,并删除旧的 DAG,以确保只有最新的 DAG 被加载和显示。在 DAG 同步完成后,将不再出现过时的 DAG。