在Airflow 2.3版本中,如果想要手动修改任务实例的状态为失败,但是又不想抛出异常,可以通过以下步骤完成:
from airflow.models import TaskInstance from airflow.utils import timezone from airflow.utils.state import State
ti = TaskInstance(task_id='your_task_id', dag_id='your_dag_id', execution_date=timezone.utcnow())
ti.set_state(State.FAILED)
ti.get_raw_job().save()
通过上述步骤,即可完成任务实例状态的修改。同时,如果需要自定义任务实例的失败原因,可以在修改状态的同时,传入reason参数。
ti.set_state(State.FAILED, reason='your_reason')