在Airflow中,sla_miss_callback回调函数应该在任务因未遵守服务水平协议(SLA)而错过时被触发。如果不触发该函数,可能是因为在DAG或任务中没有定义SLA或者SLA标记的时间太短。下面是一个示例,其中显示了如何在DAG任务中定义SLA并触发回调函数:
from airflow.models.baseoperator import BaseOperator from datetime import timedelta from airflow.utils.decorators import apply_defaults
class MyOperator(BaseOperator): @apply_defaults def init(self, sla=timedelta(minutes=10), *args, **kwargs) -> None: super().init(*args, **kwargs) self.sla = sla
def execute(self, context):
# 执行任务逻辑
pass
dag = DAG(...) task = MyOperator( task_id='my_task', sla=timedelta(minutes=5), dag=dag )
也可以在DAG级别上定义SLA,并在DAG中触发回调函数:
dag = DAG(..., sla_miss_callback=my_callback, sla=timedelta(minutes=5)) task = MyOperator( task_id='my_task', dag=dag )
请注意,必须在DAG对象中指定sla_miss_callback才能触发回调函数,在任务对象中指定的回调函数将被忽略。