在Airflow中,可以使用schedule_interval
参数来设置任务的运行间隔。对于有不同间隔的任务或者父子DAG频率不同的情况,可以通过设置不同的schedule_interval
来实现。
下面是一个代码示例:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
# 定义父DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('parent_dag', default_args=default_args, schedule_interval='@daily')
# 定义子DAG1,运行频率为每小时一次
subdag1 = DAG('subdag1', default_args=default_args, schedule_interval='@hourly')
with subdag1:
task1 = DummyOperator(task_id='subdag1_task1')
# 定义子DAG2,运行频率为每天一次
subdag2 = DAG('subdag2', default_args=default_args, schedule_interval='@daily')
with subdag2:
task2 = DummyOperator(task_id='subdag2_task1')
# 定义父DAG的任务
start_task = DummyOperator(task_id='start_task', dag=dag)
end_task = DummyOperator(task_id='end_task', dag=dag)
# 设置父子DAG的依赖关系
start_task >> subdag1 >> end_task
start_task >> subdag2 >> end_task
在上面的示例中,父DAG的schedule_interval
设置为每天一次,而子DAG1和子DAG2分别设置为每小时一次和每天一次。通过设置不同的schedule_interval
,可以实现父子DAG的频率不同的需求。
注意,子DAG在父DAG中被视为一个单独的任务,所以可以像其他任务一样设置它们的依赖关系。