from airflow import DAG
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 1, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False }
dag = DAG( 'example_dag', default_args=default_args, description='Example DAG', schedule_interval=timedelta(days=1), max_active_runs=1 )
from airflow.models import DAG, DagRun from airflow.operators.python_operator import PythonOperator
def check_max_active_runs(**kwargs): dag_id = kwargs['dag'].dag_id max_active_runs = kwargs['dag'].max_active_runs
active_runs = DagRun.find(dag_id=dag_id, state='running')
if len(active_runs) >= max_active_runs:
return False
return True
dag = DAG( dag_id='example_dag', max_active_runs=1, schedule_interval=timedelta(days=1), )
check_max_active_runs_task = PythonOperator( task_id='check_max_active_runs', python_callable=check_max_active_runs, provide_context=True, dag=dag )
Use DagRun.find() 函数获取正在运行的 DAG 实例,如果当前 DAG 实例数量超出最大值,就停止 DAG 运行。
这种方法可以确保 DAG 运行次数不超过指定数量。
上一篇:AWS托管Airflow - 如何解决触发长时间运行的Lambda函数时的350秒超时问题
下一篇:AWS托管的.NETMVC应用程序出现“UnauthorizedAccessException-Accesstothepath<>isdenied”错误。