在Airflow中,可以通过设置start_date
和schedule_interval
来控制DAG中任务的并行延迟和执行延迟。
首先,在DAG的定义中,设置start_date
为一个过去的时间,以确保DAG会立即开始执行。然后,使用schedule_interval
来设置任务的执行间隔。
接下来,可以在每个任务的定义中使用PythonOperator
来执行自定义Python函数。在函数中,可以使用time.sleep()
方法来增加延迟。
以下是一个示例代码,演示了如何实现任务的并行延迟/执行延迟增加60秒:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import time
def task1():
# 任务1的逻辑
time.sleep(60)
def task2():
# 任务2的逻辑
time.sleep(60)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'parallel_delay_example',
default_args=default_args,
description='DAG with tasks having parallel delay',
schedule_interval=timedelta(days=1),
)
task1 = PythonOperator(
task_id='task1',
python_callable=task1,
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2,
dag=dag,
)
task1 >> task2
在上述代码中,start_date
被设置为datetime(2022, 1, 1)
,确保DAG会立即开始执行。schedule_interval
被设置为timedelta(days=1)
,表示每天执行一次。
task1
和task2
是两个使用PythonOperator
定义的任务。在每个任务的Python函数中,使用time.sleep(60)
来增加60秒的延迟。
最后,使用task1 >> task2
表示task1
必须在task2
之前执行。
请注意,schedule_interval
以及任务的执行时间都是基于Airflow调度器的规则来确定的,所以实际的延迟可能会有一些偏差。