要解决这个问题,你可以在Airflow的DAG中使用"catchup=False"参数来禁用回溯功能,确保只有当前时间之前的任务会被执行。同时,你可以在任务的Python代码中添加逻辑,以检查执行日期是否在未来,如果是,则抛出一个异常。
下面是一个示例代码:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def check_execution_date(execution_date, **kwargs):
now = datetime.now()
if execution_date > now:
raise Exception("Execution date is in the future")
dag = DAG(
'example_dag',
start_date=datetime(2021, 1, 1),
catchup=False
)
task1 = PythonOperator(
task_id='check_execution_date',
python_callable=check_execution_date,
provide_context=True,
dag=dag
)
task2 = PythonOperator(
task_id='another_task',
python_callable=another_function,
provide_context=True,
dag=dag
)
task1 >> task2
在这个示例中,check_execution_date
函数接受execution_date
和kwargs
参数。它首先获取当前时间,然后检查execution_date
是否在未来。如果是,则抛出一个异常。如果不是,则任务将继续执行。
通过在DAG中设置catchup=False
,你可以确保只有当前时间之前的任务会被执行。这样,即使从Web UI手动触发,也不会执行未来的任务。