在DAG中设置schedule_interval或者修改dag_id。
例如,下面的DAG设置了每隔5分钟执行一次:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(minutes=5),
)
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task1 >> task2
设置schedule_interval可以确保DAG在指定的时间间隔内只被触发一次。如果您已经设置了schedule_interval,但DAG仍被多次触发,则可能是由于不同的调度器(scheduler)同时尝试运行DAG,您可以尝试调整调度器配置来解决这个问题。此外,如果您使用的是CeleryExecutor,请确保已正确配置并发任务的数量。
另外,也可以尝试修改dag_id,适用于需要在同一个文件中运行多个DAG的情况。例如,修改dag_id以避免与其他DAG冲突:
dag = DAG(
'example_dag_v2',
default_args=default_args,
schedule_interval=timedelta(minutes=5),
)