问题是在Airflow 1.10.10的核心版本和1.10.15的日志版本之间存在兼容性问题,尤其是在AWS S3远程日志记录方面。解决此问题的方法是在Airflow的代码中对日志记录进行相应的更改。
以下是通过在Airflow的配置文件中更改日志记录设置来解决此问题的示例代码:
import boto3
import airflow
from airflow import configuration
from airflow.utils.log.logging_mixin import StreamLogWriter
s3 = boto3.client('s3', aws_access_key_id=CONFIG['access_key'],
aws_secret_access_key=CONFIG['secret_access_key'])
LOGGING_CONFIG = {
...,
'default': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.utils.log.file_task_handler.SMLogFormatter',
'filename': '''{AIRFLOW_HOME}/logs/airflow-worker.log''',
'base_log_folder': '{AIRFLOW_HOME}/logs',
's3_log_folder': '{{ ds }}',
'task_log_suffix': CONFIG.get('task_log_suffix', ''),
'log_level': LOG_LEVEL,
'end_of_log_mark': END_OF_LOG_MARK,
's3_key_suffix': '/'.join(['logs', '{{ ds }}']),
's3_additional_kwargs': {
'ServerSideEncryption': 'AES256',
'ACL': 'bucket-owner-full-control',
'StorageClass': 'STANDARD_IA',
},
's3_access_key_id': CONFIG['access_key'],
's3_secret_access_key': CONFIG['secret_access_key'],
's3_bucket': CONFIG['bucket'],
's3_prefix': '/'.join([CONFIG.get('key_prefix', ''), 'logs']),
},
...
}
通过上述操作,您可以解决Airflow 1.10.10和1.10.15之间的AWS S3远程日志记录兼容性问题。