Airflow 的 clear_tasks 命令只会清除任务状态,并不会从数据库中删除数据。
如果需要从数据库中删除数据,可以使用以下代码片段:
from airflow import settings
from airflow.models import TaskInstance
session = settings.Session()
# 删除指定 dag_id 和 execution_date 的任务实例
dag_id = 'test_dag'
execution_date = '2022-01-01'
task_instances = session.query(TaskInstance).filter(
TaskInstance.dag_id == dag_id,
TaskInstance.execution_date == execution_date).all()
for ti in task_instances:
session.delete(ti)
session.commit()
其中,dag_id
和 execution_date
分别表示 DAG 的标识符和执行日期。该代码片段将会删除指定 DAG 在指定日期的所有任务实例。请谨慎操作,避免误删数据。