这种情况通常是由于Airflow依赖的后端数据库出现问题而导致的。解决方案是检查数据库连接是否正常,清除数据库中的任务并重新启动调度器。
以下是一个简单的代码示例,用于清除数据库中的未完成任务:
from airflow import models, settings
from airflow.contrib.jobs import EventLogger, LocalTaskJob as LJ
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.state import State
import logging
# 连接数据库
session = settings.Session()
logging.getLogger().setLevel(logging.INFO)
# 清除所有已启动但未完成的任务
tasks = session.query(LJ).filter(LJ.state == State.RUNNING)
for task in tasks:
try:
logging.info("Clearing task %s", task)
task.state = State.FAILED
event_logger = EventLogger(session, task)
event_logger.on_failure("scheduler", "Task instance marked as failed")
except Exception as e:
logging.error("Error clearing task %s: %s", task, str(e))
session.commit()
# 重启调度器
scheduler = SchedulerJob()
scheduler.run()
session.close()
在清除任务后,请确保重新启动调度器。