这个问题通常是由Celery任务状态查找失败引起的。为了解决这个问题,可以尝试以下方法:
确保你已经正确配置了Celery,并且正在使用与Airflow版本兼容的Celery版本。如果你使用的是Cloud Composer,则不需要担心这个问题,因为它会自动配置Celery。
确认你的Celery worker正在运行,而且拥有适当的权限和资源。你可以通过运行以下命令来检查Celery worker的状态:
celery -A status
确认你的DAG已经成功启动,并且你正在使用正确的任务ID来查询任务状态。你可以通过以下代码示例来获取当前DAG中所有任务的状态:
from airflow.models import TaskInstance
from airflow.utils.state import State
dag_id = ''
ti_list = TaskInstance.find(dag_id=dag_id)
for ti in ti_list:
if ti.state == State.RUNNING:
# The task is running
print ti.task_id
elif ti.state == State.SUCCESS:
# The task has completed successfully
print ti.task_id
elif ti.state == State.FAILED:
# The task has failed
print ti.task_id
希望这些解决方法可以帮助你解决问题。