要解决Airflow调度程序不遵守EndTime与datetime.now()+timedelta()的问题,您可以使用以下解决方法:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def check_end_time():
current_time = datetime.now()
end_time = current_time + timedelta(hours=2) # 设置截止时间为当前时间后的2小时
if current_time > end_time:
# 执行到达截止时间后的操作
dag = DAG('my_dag', description='My DAG', schedule_interval='0 * * * *')
task = PythonOperator(
task_id='check_end_time',
python_callable=check_end_time,
dag=dag,
)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.sensors import TimeSensor
dag = DAG('my_dag', description='My DAG', schedule_interval='0 * * * *')
end_time = datetime.now() + timedelta(hours=2) # 设置截止时间为当前时间后的2小时
sensor_task = TimeSensor(
task_id='check_end_time',
target_time=end_time,
dag=dag,
)
# 或者使用ExternalTaskSensor
# from airflow.operators.sensors import ExternalTaskSensor
# sensor_task = ExternalTaskSensor(
# task_id='check_end_time',
# external_dag_id='external_dag',
# external_task_id='external_task',
# check_existence=True,
# dag=dag,
# )
以上是两种解决方法,您可以根据您的需求选择适合的方法来确保Airflow调度程序遵守EndTime与datetime.now()+timedelta()。