确认Airflow配置文件中正确设置了DAG中任务的重试次数和重试间隔,并重新加载配置文件。 例如,在airflow.cfg中配置:
retry_delay = timedelta(minutes=1) retries = 3
确认任务代码中有适当的日志记录和错误处理语句。在任务失败时记录日志并重新抛出异常,使Airflow能够正确识别任务失败。 例如:
def my_task():
try:
# some code
except Exception as e:
logging.error("Error in my_task: {}".format(e))
raise e
def my_task():
if not some_condition:
return False # failure
else:
return True # success
def set_xcom_value(**kwargs):
value = 'my_value'
task_instance = kwargs['ti']
task_instance.xcom_push(key='my_key', value=value)
在另一个任务中获取XCom值:
def get_xcom_value(**kwargs):
task_instance = kwargs['ti']
value = task_instance.xcom_pull(key='my_key')
if not value:
raise ValueError('Failed to retrieve XCom value.')
else:
# do something with the value
from airflow import