在Airflow中,可以使用Parallelism
参数来控制任务内的并行执行。以下是一个包含代码示例的解决方法:
首先,导入所需的模块和类:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
接下来,定义一个函数来执行任务的具体操作:
def task_function(task_id):
# 执行任务的逻辑
print(f"Executing task with ID: {task_id}")
然后,创建一个DAG并设置相关参数:
dag = DAG(
dag_id="parallel_execution_example",
start_date=datetime(2022, 1, 1),
schedule_interval=None,
catchup=False
)
接着,使用PythonOperator
创建多个任务,并将它们添加到DAG中:
task1 = PythonOperator(
task_id="task1",
python_callable=task_function,
op_kwargs={"task_id": "task1"},
dag=dag
)
task2 = PythonOperator(
task_id="task2",
python_callable=task_function,
op_kwargs={"task_id": "task2"},
dag=dag
)
task3 = PythonOperator(
task_id="task3",
python_callable=task_function,
op_kwargs={"task_id": "task3"},
dag=dag
)
task4 = PythonOperator(
task_id="task4",
python_callable=task_function,
op_kwargs={"task_id": "task4"},
dag=dag
)
最后,通过设置任务的依赖关系来定义执行顺序:
task1 >> [task2, task3] >> task4
完整的代码示例如下:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def task_function(task_id):
print(f"Executing task with ID: {task_id}")
dag = DAG(
dag_id="parallel_execution_example",
start_date=datetime(2022, 1, 1),
schedule_interval=None,
catchup=False
)
task1 = PythonOperator(
task_id="task1",
python_callable=task_function,
op_kwargs={"task_id": "task1"},
dag=dag
)
task2 = PythonOperator(
task_id="task2",
python_callable=task_function,
op_kwargs={"task_id": "task2"},
dag=dag
)
task3 = PythonOperator(
task_id="task3",
python_callable=task_function,
op_kwargs={"task_id": "task3"},
dag=dag
)
task4 = PythonOperator(
task_id="task4",
python_callable=task_function,
op_kwargs={"task_id": "task4"},
dag=dag
)
task1 >> [task2, task3] >> task4
这样,task2
和task3
将在task1
完成后同时开始执行,并且task4
将在它们完成后执行。