在 Airflow 中,通过 DAG(有向无环图)来定义任务的依赖关系和调度逻辑。在 DAG 中,可以定义常驻任务(也叫持久化任务),这些任务会不断运行,直到 DAG 被手动停止。通过查看这些 DAG 中常驻任务的定义,就能找到 Airflow 上不断运行的任务。
以下是一个示例代码,用于查找所有 DAG 中的常驻任务:
from airflow.models import DagModel, DagBag
from airflow.utils.state import State
dags = DagBag()
for dag_id, dag in dags.dags.items():
dag_model = DagModel.get_current(dag.dag_id)
if dag_model.is_active:
for task in dag.tasks:
if task.task_type != 'SubDagOperator' and not task._upstream_task_ids:
# 直接上游的任务为常驻任务
if task.upstream_task_ids == ['']:
print(f"[{dag_id}] {task.task_id}: PERSISTENT")
# 直接下游的任务为常驻任务
elif set(task.downstream_task_ids) == {task.task_id}:
print(f"[{dag_id}] {task.task_id}: PERSISTENT")
该代码会遍历所有 DAG,找到其中的常驻任务并打印出来。对于直接上游的任务和直接下游的任务,都可以判断为常驻任务。使用方法如下:
find_persistent_tasks.py
;python /path/to/find_persistent_tasks.py