使用Python的datetime库和Airflow的built-in方法来获取下一个执行时间。
代码示例:
from datetime import datetime, timedelta
from airflow.utils.dates import croniter
# 指定cron表达式
cron_expression = "0 3 * * *"
# 计算最近的下一个Datetime
now = datetime.now()
cron = croniter(cron_expression, now)
next_datetime = cron.get_next(datetime)
# 转换为下一个UTC时间
next_utc_time = next_datetime - timedelta(seconds=now.second, microseconds=now.microsecond)
next_utc_time = next_utc_time.replace(tzinfo=pytz.UTC)
# 将计算出的下一个时间传递给DAG的default_args
default_args = {
'start_date': datetime.now(),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': timedelta(days=1),
'next_execution': next_utc_time # 新增的参数
}
dag = DAG(dag_id='example_dag', default_args=default_args, schedule_interval=cron_expression)