在Airflow 2.4.2中,KubernetesPodOperator的on_failure_callback回调函数似乎在任务失败时不起作用。要解决这个问题,可以通过自定义Operator并覆盖其中的execute方法来实现。下面是一个示例代码:
from airflow.utils.decorators import apply_defaults
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
class CustomKubernetesPodOperator(KubernetesPodOperator):
@apply_defaults
def __init__(self, on_failure_callback=None, *args, **kwargs):
super(CustomKubernetesPodOperator, self).__init__(*args, **kwargs)
self.on_failure_callback = on_failure_callback
def execute(self, context):
try:
super().execute(context)
except Exception as e:
if self.on_failure_callback:
self.on_failure_callback(self, context)
raise e
在上面的示例中,我们继承了KubernetesPodOperator并重写了execute方法。在执行任务时,我们首先尝试调用原始execute方法;如果任务失败,我们会调用on_failure_callback回调函数并重新抛出异常以确保任务失败。您可以使用这个定制的Operator替换您的KubernetesPodOperator实例并指定您的on_failure_callback函数来处理任务失败。