当Airflow DAG正在运行但任务未被执行时,可能有以下几个原因:
is_active
属性为True。您可以通过以下代码检查和设置DAG的状态:from airflow.models import DAG
dag_id = 'your_dag_id'
dag = DAG(dag_id)
# 检查DAG的状态
print(dag.is_active)
# 设置DAG的状态为活动
dag.is_active = True
state
属性为"running"或"queued"。您可以使用以下代码检查和设置任务的状态:from airflow.models import TaskInstance
dag_id = 'your_dag_id'
task_id = 'your_task_id'
execution_date = 'your_execution_date'
# 获取任务实例
task_instance = TaskInstance(dag_id=dag_id, task_id=task_id, execution_date=execution_date)
# 检查任务的状态
print(task_instance.state)
# 设置任务的状态为等待运行
task_instance.state = "queued"
from airflow.models import TaskInstance
dag_id = 'your_dag_id'
task_id = 'your_task_id'
execution_date = 'your_execution_date'
# 获取任务实例
task_instance = TaskInstance(dag_id=dag_id, task_id=task_id, execution_date=execution_date)
# 检查任务的依赖关系
print(task_instance.are_dependencies_met())
通过排除上述问题,您应该能够解决Airflow DAG正在运行但任务未被执行的问题。