在Airflow 2.0版本中,引入了新的日志记录系统,需要更新相关配置才能正常使用远程日志记录。具体做法如下:
首先,在远程服务器上创建一个文件夹作为存放日志的目录,然后更新Airflow的配置文件airflow.cfg,添加如下内容:
[logger_remote]
level = INFO
handlers = remote
qualname = remote
propagate = 0
[handler_remote]
class = airflow.utils.log.remote_logging.RemoteLogHandler
formatter = json_formatter
username = REMOTE_SERVER_USERNAME
password = REMOTE_SERVER_PASSWORD
host = REMOTE_SERVER_HOST
port = REMOTE_SERVER_PORT
base_log_folder = /path/to/remote/logs
sftp_conn_id = REMOTE_CONN_ID
其中,REMOTE_SERVER_USERNAME 和 REMOTE_SERVER_PASSWORD 分别是远程服务器的用户名和密码,REMOTE_SERVER_HOST 和 REMOTE_SERVER_PORT 是远程服务器的地址和端口号,/path/to/remote/logs 是存放日志的目录路径,REMOTE_CONN_ID 是远程服务器的连接ID。
然后,在Airflow的 dag 文件中添加以下代码:
from airflow.utils.log.logging_mixin import LoggingMixin
class MyOperator(BaseOperator, LoggingMixin):
def execute(self, context):
self.log.info("This is a remote logging test")
其中,MyOperator 是自定义的Operator类,在 execute 方法中可以使用 log 记录日志,这样就可以将日志发送到远程服务器的指定目录了。
需要注意的是,日志的格式需要与上面的配置文件中的 formatter 一致,这里使用了默认的 json_formatter。除此之外,还需要确保远程服务器上的SSH服务已经开启,并且sftp_conn_id在Airflow中已经配置完毕。
完成上述配置之后,重新启动Airflow服务,就可以使用远程日志记录功能了。