Airflow 中的 catchup API 是用于在重新启动 DAG 或添加新的任务之后,将未执行的任务补充到调度队列中。它的功能是允许用户选择是否要回溯执行过去未执行的任务,并将这些任务添加到调度队列中。
以下是使用 catchup API 的解决方法:
dag = DAG(
'my_dag',
schedule_interval='0 0 * * *',
catchup=True
)
task = PythonOperator(
task_id='my_task',
python_callable=my_callable_function,
provide_context=True,
dag=dag
)
def my_callable_function(**context):
execution_date = context['execution_date']
# 根据 execution_date 执行相应的任务逻辑
airflow trigger_dag my_dag
或使用 Airflow 的 REST API:
curl -X POST "http://localhost:8080/api/v1/dags/my_dag/dagRuns" -H "accept: application/json" -H "Content-Type: application/json" -d "{}"
这样,当 DAG 重新启动或有新任务添加后,Airflow 将自动检查未执行的任务,并将它们添加到调度队列中,使其能够按照指定的调度间隔进行执行。