要实现Airflow DAG多次运行,可以使用execution_date
参数来指定DAG的运行日期。下面是一个示例代码:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task1 >> task2
在上面的示例中,我们创建了一个名为example_dag
的DAG,它将每天运行一次。start_date
参数指定了DAG的开始日期,schedule_interval
参数指定了DAG的运行间隔。
要运行多个实例的DAG,可以通过execution_date
参数指定不同的运行日期。例如,要运行2022年1月1日和2022年1月2日的DAG实例,可以使用以下代码:
from airflow.models import DagRun
from airflow.utils.dates import days_ago
dag_id = 'example_dag'
dag = DagRun.find(dag_id=dag_id, execution_date=datetime(2022, 1, 1), state="running")
if not dag:
# 创建一个新的DAG实例
dagrun = DagRun(
dag_id=dag_id,
execution_date=datetime(2022, 1, 1),
state="running",
external_trigger=True
)
dagrun.run(ignore_task_deps=True, ignore_ti_state=True)
dag = DagRun.find(dag_id=dag_id, execution_date=datetime(2022, 1, 2), state="running")
if not dag:
# 创建一个新的DAG实例
dagrun = DagRun(
dag_id=dag_id,
execution_date=datetime(2022, 1, 2),
state="running",
external_trigger=True
)
dagrun.run(ignore_task_deps=True, ignore_ti_state=True)
上面的代码首先检查指定日期的DAG实例是否已经存在,如果不存在,则创建一个新的DAG实例并运行。external_trigger=True
表示这是一个外部触发的DAG实例。
这样,就可以通过指定不同的execution_date
参数来运行多个实例的DAG。