在使用ExternalTaskSensor时,如果要清除其他DAG任务的状态,需要使用ExternalTaskMarker来实现。但是有时会出现无法清除状态的情况,即任务状态为None。
要解决这个问题,可以在ExternalTaskMarker中添加if条件判断,以确保只有在任务状态不为None时才进行清除操作。代码示例如下:
from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.sensors.external_task_sensor import ExternalTaskSensor from airflow.operators.python_operator import PythonOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.trigger_rule import TriggerRule from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.models import Variable
def clear_other_dag_task(): """ 清除其他DAG中的任务 """ dag_id = 'dag_id' task_id = 'task_id' external_dag_id = 'external_dag_id' external_task_id = 'external_task_id' if ExternalTaskSensor( task_id=task_id, external_dag_id=external_dag_id, external_task_id=external_task_id, dag=dag_id ).poke(context={}) == True: clear_task = ExternalTaskMarker( task_id=task_id, external_dag_id=external_dag_id, external_task_id=external_task_id, execution_date=None, dag=dag_id ) if clear_task.state() is not None: clear_task.run()
clear_other_dag_task()