在Airflow DAG文件中为需要使用池的任务设置优先级,并将池设置为fair。示例代码如下:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
}
dag = DAG('my_dag', default_args=default_args)
def task_1():
# do something
def task_2():
# do something
def task_3():
# do something
t1 = PythonOperator(
task_id='task_1',
python_callable=task_1,
dag=dag,
pool='my_pool',
priority_weight=1
)
t2 = PythonOperator(
task_id='task_2',
python_callable=task_2,
dag=dag,
pool='my_pool',
priority_weight=2
)
t3 = PythonOperator(
task_id='task_3',
python_callable=task_3,
dag=dag,
pool='my_pool',
priority_weight=3
)
t1.set_downstream(t2)
t2.set_downstream(t3)
在这个例子中,任务t1、t2和t3都使用池my_pool,并为它们设置了不同的优先级。设置了池的fair参数后,优先级低的任务不会优先触发。