在Airflow中,DAG的任务可以使用Python的time.sleep()
函数来模拟休眠操作。当任务休眠时,调度器会等待任务完成后再执行其他DAG。
以下是一个示例代码,演示了如何在任务中使用time.sleep()
函数进行休眠操作:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import time
def task_sleep():
print("Start sleeping...")
time.sleep(60) # 休眠60秒
print("Finished sleeping!")
default_args = {
'start_date': datetime(2022, 1, 1),
'retries': 0
}
with DAG('sleep_dag', schedule_interval='@once', default_args=default_args) as dag:
task1 = PythonOperator(
task_id='task_sleep',
python_callable=task_sleep
)
task1
在上述代码中,我们定义了一个名为sleep_dag
的DAG,使用PythonOperator
来执行task_sleep
函数。task_sleep
函数中使用time.sleep(60)
来模拟任务的休眠操作,休眠时间为60秒。
当我们运行这个DAG时,任务会休眠60秒,然后输出"Finished sleeping!"。在这段时间内,调度器会等待任务完成后再执行其他DAG。
注意,为了使调度器停止执行其他DAG,我们将schedule_interval
参数设置为@once
,这意味着DAG只会被调度执行一次。如果你想要任务休眠时,调度器停止执行其他DAG的所有实例,可以使用其他的schedule_interval
参数,比如None
或者@hourly
。