Airflow调度器Catchup是指当我们创建一个新的DAG或修改已有的DAG时,Airflow会根据DAG定义的schedule_interval将已过期但未执行的任务自动补充执行。但是,如果我们使用的是较旧的版本的Airflow,在DAG定义中修改schedule_interval后,Catchup不会自动应用于所有过去的时间段。这意味着,任何过去的和计划的未来任务都不会自动重新计划,因此我们需要使用以下代码手动触发Catchup:
from airflow import DAG, configuration
from datetime import datetime
from airflow.api.common.experimental.trigger_dag import trigger_dag
# 设置catchup为True
configuration.conf.set('scheduler', 'catchup_by_default', 'True')
# 定义DAG
dag = DAG(
dag_id='my_dag',
description='My DAG with Catchup',
start_date=datetime(2022, 1, 1),
schedule_interval='0 0 * * *',
)
# 手动触发Catchup
trigger_dag(dag_id='my_dag', run_id=dag.default_args['execution_date'])
在上面的代码中,我们首先将Airflow的默认Catchup配置设置为True,然后手动触发DAG的运行,从而触发Catchup,并在Catchup过程中自动运行所有过去的和未来计划的任务。这将确保您的DAG在调度后立即启动,并保持后续任务按照设定的调度间隔运行。