在Airflow中,如果DAG的失败回调被多次调用,可能是由于多个任务重试失败导致的。下面是一个解决方法的示例代码:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from airflow.utils.state import State
def my_failed_task_callback(context):
# 检查任务是否已经失败过
if context['task_instance'].state == State.FAILED:
# 在这里添加你的处理逻辑,比如发送通知或记录日志
dag = DAG(dag_id='my_dag', default_args=default_args)
@provide_session
def my_task_function(*args, **kwargs):
# 在这里添加你的任务逻辑
pass
task = PythonOperator(
task_id='my_task',
python_callable=my_task_function,
on_failure_callback=my_failed_task_callback,
dag=dag
)
在上面的代码中,我们定义了一个my_failed_task_callback
函数作为任务的失败回调。在回调函数中,我们首先检查任务的状态是否为失败状态,以确保只有在任务失败时才触发回调。然后,你可以在回调函数中添加你自己的处理逻辑,比如发送通知或记录日志。
接下来,我们创建一个PythonOperator
任务,并将回调函数传递给on_failure_callback
参数。这样,当任务失败时,Airflow将自动调用我们定义的回调函数。
请注意,这只是一个示例代码,你可能需要根据你的实际需求进行相应的修改。