可以使用cron表达式和datetime库来解决该问题。
示例代码:
default_args = { 'start_date': datetime(2021, 7, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5) }
dag = DAG('my_dag', default_args=default_args, schedule_interval='0 8 * * *')
def my_func(): print('Hello World')
task = PythonOperator( task_id='my_task', python_callable=my_func, dag=dag )
说明:schedule_interval参数用于定义DAG的运行频率,可以使用cron表达式来实现更精细的控制。以上代码中,DAG会在每天早上8点运行。
default_args = { 'start_date': datetime(2021, 7, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5) }
dag = DAG('my_dag', default_args=default_args, schedule_interval=None)
def my_func(): print('Hello World')
task = PythonOperator( task_id='my_task', python_callable=my_func, dag=dag )
now = datetime.now() if now.hour < 8: dag.start_date = datetime(now.year, now.month, now.day, 8, 0) else: dag.start_date = datetime(now.year, now.month, now.day, 8, 0) + timedelta(days=1)
说明:以上代码中,我们在定义DAG的时候,将schedule_interval设为None,表示不激活自动调度。然后使用datetime库来计算下一次DAG的运行时间,让其在每天早上8点运行。
上一篇:AirflowDAG-FailedTaskDoesn'tShowFailStatusasItShould
下一篇:AirflowDAG:MovingemptyCSVfilestoarchivebucketandinsertingnon-emptyfilesintoBigQuery