在Airflow中,可以使用Python的time模块来实现等待(休眠)任务的高效性。下面是一个示例代码:
import time
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def sleep(seconds):
time.sleep(seconds)
print(f"Waited for {seconds} seconds.")
default_args = {
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with DAG('sleep_task', default_args=default_args, schedule_interval=None) as dag:
task = PythonOperator(
task_id='sleep_task',
python_callable=sleep,
op_args=[10], # 休眠10秒钟
)
在上面的示例中,我们创建了一个名为sleep_task
的DAG,并定义了一个名为sleep
的Python函数,该函数使用time.sleep()
方法来实现等待(休眠)任务。然后,我们使用PythonOperator
来创建一个名为sleep_task
的任务,该任务调用了sleep
函数,并传递了一个参数10
,表示休眠10秒钟。
这样,当DAG运行时,sleep_task
任务将会等待(休眠)10秒钟,然后打印出Waited for 10 seconds.
的消息。
请注意,要在Airflow中使用上述代码示例,需要安装apache-airflow
和apache-airflow-providers-cncf-kubernetes
等相关依赖库。