通常出现此问题是由于在SFTPHook的配置中使用了私钥,但是该私钥的权限或文件路径不正确导致认证失败。解决方法可以尝试以下步骤:
airflow clear -f -d -q
systemctl restart airflow-webserver
systemctl restart airflow-scheduler
systemctl restart airflow-worker
from airflow.contrib.hooks import SSHHook
from airflow.contrib.hooks.ssh_hook import SSHHookMixin
from paramiko import AuthenticationException
class SFTPHook(SSHHook, SSHHookMixin):
def __init__(self, ftp_conn_id=None, ssh_conn_id=None, *args, **kwargs):
super().__init__(ssh_conn_id=ssh_conn_id)
self.ftp_conn_id = ftp_conn_id
self.conn = self.get_connection(ftp_conn_id)
def get_conn(self):
if self.conn.password:
self.password_authenticate()
else:
self.import_private_key()
transport = self.conn.ssh_client.get_transport()
return transport.open_sftp()
def import_private_key(self):
try:
self.conn.ssh_client.load_system_host_keys()
pkey = None
if self.conn.private_key:
pkey = self._read_private_key()
elif self.conn.extra_dejson.get('private_key'):
pkey = self._read_private_key(file_path=self.conn.extra_dejson.get('private_key'))
self.conn.ssh_client.connect(
hostname=self.conn.host,
username=self.conn.login,
port=self.conn.port,
pkey=pkey,
)
except AuthenticationException as auth_error:
raise AirflowException(f"Failed to authenticate to SFTP using private key: {auth_error}")
def password_authenticate(self):
try:
self.conn.ssh_client.connect(
hostname=self.conn.host,
username=self.conn.login,
password=self.conn.password,
port=self.conn.port,
timeout=self.timeout,
)
except Exception as e:
raise AirflowException(f"Failed to authenticate to SFTP using password: {e}")
sftp_hook = SFTPHook(
ftp_conn_id='example_sftp_conn',
ssh_conn_id='example_ssh_conn',
)