需要在任务中添加try-except代码块,捕获回调函数中的异常并将任务标记为失败。以下是示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def task_callback():
raise Exception("Callback function failed")
def task_function():
# do something
pass
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1)
}
with DAG('example_dag', default_args=default_args) as dag:
t1 = PythonOperator(
task_id='task1',
python_callable=task_function
)
t2 = PythonOperator(
task_id='task2',
python_callable=task_function,
on_failure_callback=task_callback
)
t1 >> t2
# Add try-except block to catch exceptions in callback function
def catch_exception(context):
ti = context.get("task_instance")
if ti.try_number == 1:
# Exception happened in the first try, mark as failed
ti.xcom_push(key="Exception", value=True)
raise Exception("Callback function failed")
t2.on_failure_callback = catch_exception
在上面的示例中,我们定义了两个PythonOperator任务。其中,第二个任务(task2)被指定了一个on_failure_callback参数,它指向我们的回调函数task_callback(),并且它在第一个任务(task1)完成后执行。
我们还定义了一个名为catch_exception()的新函数,该函数在回调函数中捕获异常并将任务标记为失败。为此,我们使用context对象来获取任务实例(ti)并检查任务重试次数,以确保我们仅在第一次尝试时标记任务为失败。
最后,我们将catch_exception()函数分配给t2.on_failure_callback回调函数,以捕获回调函数中的异常并将任务标记为失败。