要在不同的时间运行相同的DAG,可以使用Airflow的调度功能和时间表。以下是一个包含代码示例的解决方法:
首先,确保已安装Airflow和相关依赖项。
example_dag.py
。from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def create_dag(dag_id, schedule, default_args):
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
# 定义任务
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
# 设置任务之间的依赖关系
task1 >> task2
return dag
# 默认参数
default_args = {
'start_date': datetime(2022, 1, 1),
}
# 创建两个DAG,分别在不同的时间运行
dag1 = create_dag('example_dag1', '0 0 * * *', default_args) # 每天凌晨执行
dag2 = create_dag('example_dag2', '0 12 * * *', default_args) # 每天中午12点执行
将example_dag.py
文件放入Airflow的DAG目录中(默认为$AIRFLOW_HOME/dags
)。
启动Airflow调度器和Web服务器。
打开Airflow的Web界面,在"DAGs"页面中可以看到两个新创建的DAG:example_dag1和example_dag2。
可以手动触发DAG的运行,也可以根据设置的时间表自动运行DAG。
通过以上步骤,你可以在不同的时间运行相同的DAG。可以根据需要修改时间表和任务,以适应不同的场景。