可以使用 Airflow 提供的 TriggerDagRunOperator 和 ExternalTaskSensor 来实现任务数量控制。在 DAG 中添加 TriggerDagRunOperator,指定需要触发的 DAG ID,以及在何时触发。在需要等待外部任务结束后才能开始的任务中,添加 ExternalTaskSensor,指定需要等待的任务的 DAG ID 和任务 ID。这样可以保证在指定数量的任务运行完后,才会触发新的任务运行。示例代码如下:
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.sensors import ExternalTaskSensor
# 定义 DAG
# 任务数量控制 DAG
task_control_dag = DAG(
'task_control',
schedule_interval=None,
max_active_runs=1
)
# 在需要限定数量的 DAG 中添加 TriggerDagRunOperator
trigger = TriggerDagRunOperator(
task_id='trigger_dagrun',
trigger_dag_id='my_dag',
dag=task_control_dag,
python_callable=lambda context, dag_run_obj: len(context['dag_run'].get_runs()) < 3
# 当运行中的任务数小于 3 时触发新任务
)
# 在需要等待外部任务结束后才能开始的任务中,添加 ExternalTaskSensor
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='my_dag',
external_task_id='some_task',
dag=task_control_dag
)
# 将任务顺序添加到 DAG 中
wait_for_task >> trigger >> some_other_task
上一篇:AirflowDAG步骤依赖关系
下一篇:AirflowDAG持续失败