要确保Apache Airflow DAG正确调用了on_success_callback和on_failure_callback,可以按照以下步骤进行解决:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'on_success_callback': my_success_callback,
'on_failure_callback': my_failure_callback
}
dag = DAG(
'my_dag',
default_args=default_args,
schedule_interval='@daily'
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
def my_success_callback(context):
print("Task succeeded!")
def my_failure_callback(context):
print("Task failed!")
确保你的DAG已经正确导入并在Airflow中启动。
如果仍然无法调用回调函数,请检查Airflow日志以查看是否有任何与回调相关的错误消息。
通过按照上述步骤检查和调试,应该能够解决Apache Airflow DAG没有调用on_success_callback和on_failure_callback的问题。