这个问题通常是由于不同的时区或者时间同步问题导致的。解决方法是通过在airflow.cfg文件中配置default_timezone和catchup_by_default参数来设置时区和调度延迟:
default_timezone = Asia/Shanghai
catchup_by_default = False
此外,还可以通过在编写DAG时设置start_date和schedule_interval来进一步指定任务开始时间和调度间隔。例如,以下代码将任务于每日上午9点15分运行:
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python_operator import PythonOperator
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2021, 11, 1, 9, 15), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }
dag = DAG(dag_id='example_dag', default_args=default_args, schedule_interval=timedelta(days=1))
def my_task(): print('Hello, world!')
task = PythonOperator(task_id='my_task', python_callable=my_task, dag=dag)
通过上述代码的设置,任务将会在每日上午9点15分开始,确保了任务按照预期的时间运行。