可以使用Airflow提供的DagRun类中的state
属性将DagRun标记为失败状态或设置为终止状态。
例如,要将DagRun标记为失败状态,可以使用以下代码:
from airflow.models import DagRun
dag_id = 'example_dag'
dag_run_id = 'manual__2020-09-01T00:00:00+00:00'
dag_run = DagRun.find(dag_id=dag_id, run_id=dag_run_id)
dag_run.state = 'failed'
dag_run.end_date = datetime.utcnow()
dag_run.update()
同样,要终止DagRun,可以将它的状态设置为killed
,并设置end_date
属性。
from airflow.models import DagRun
dag_id = 'example_dag'
dag_run_id = 'manual__2020-09-01T00:00:00+00:00'
dag_run = DagRun.find(dag_id=dag_id, run_id=dag_run_id)
dag_run.state = 'killed'
dag_run.end_date = datetime.utcnow()
dag_run.update()