在DAG定义中添加一个concurrency
参数,并将其设置为大于零的值,以确保DAG中的所有任务可以同时运行。
例如,以下代码段定义了一个名为my_dag
的DAG,并将concurrency
设置为2
,每个任务都将在它们结束之前并行运行,以便加快处理时间:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('my_dag',
default_args=default_args,
schedule_interval='@daily',
concurrency=2)
t1 = BashOperator(
task_id='task_1',
bash_command='echo task_1',
dag=dag)
t2 = BashOperator(
task_id='task_2',
bash_command='echo task_2',
dag=dag)
t3 = BashOperator(
task_id='task_3',
bash_command='echo task_3',
dag=dag)
t4 = BashOperator(
task_id='task_4',
bash_command='echo task_4',
dag=dag)
t1 >> [t2, t3] >> t4
在此示例中,concurrency
参数设置为2
,因此在同一时间最多只能运行两个任务。测试通过此代码段可以同时运行my_dag
中的所有任务。