Airflow PythonOperator 任务通常会自动失败或成功,并根据返回值(True 或 False)确定它们的执行状态。
要确保PythonOperator任务能够正确地失败,请使用 PythonRaiseError 函数。PythonRaiseError 函数将引发异常,导致任务以失败状态终止。
例如,以下代码段定义了一个PythonOperator任务,并在某些条件下失败:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def some_complex_logic():
# some complex logic here
if some_condition:
raise ValueError("Task failed due to some error.")
dag = DAG(
dag_id='example_python_operator',
schedule_interval=None,
start_date=dt.datetime(2021, 1, 1),
)
task = PythonOperator(
task_id='example',
python_callable=some_complex_logic,
dag=dag,
)
上述代码段中,如果“some_condition”求值为真,则PythonOperator任务将引发 ValueError 异常,并且任务将以失败状态终止。这将确保任务能够正确地处理异常情况并根据需要失败。