Airflow默认不允许同一DAG实例的并发运行。这是为了避免出现资源竞争和数据争抢。但是,可以通过配置Airflow,使其允许同一DAG的并发运行。
方法一:在DAG定义中添加参数concurrency。
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'concurrency': 2, # 设置并发数为2
}
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
方法二:在Airflow配置文件中(默认路径为~/airflow/airflow.cfg)修改参数parallelism。
parallelism = 16 # 设置并发数为16
dag_concurrency = 4 # 设置每个DAG的并发数为4
需要注意的是,虽然Airflow允许同一DAG的并发运行,但是有些操作(比如修改数据库结构)可能会引起竞争问题。因此,在设计DAG时,建议避免使用全局变量等可能造成竞争的语法。