要配置Airflow的邮件设置,需要进行以下步骤:
打开Airflow的配置文件 airflow.cfg
,默认位于 $AIRFLOW_HOME/airflow.cfg
。可以使用文本编辑器打开该文件。
在配置文件中找到 [smtp]
部分,该部分用于配置SMTP服务器的设置。如果没有找到该部分,可以手动添加以下内容:
[smtp]
smtp_host = YOUR_SMTP_HOST
smtp_starttls = True
smtp_ssl = False
smtp_user = YOUR_SMTP_USERNAME
smtp_password = YOUR_SMTP_PASSWORD
smtp_port = YOUR_SMTP_PORT
smtp_mail_from = YOUR_SMTP_SENDER_EMAIL
将 YOUR_SMTP_HOST
替换为你的SMTP服务器的主机名或IP地址,将 YOUR_SMTP_USERNAME
和 YOUR_SMTP_PASSWORD
替换为SMTP服务器的用户名和密码。将 YOUR_SMTP_PORT
替换为SMTP服务器的端口号,通常为25、465或587。将 YOUR_SMTP_SENDER_EMAIL
替换为用于发送电子邮件的发件人邮箱。
保存并关闭配置文件。
在Airflow的安装目录中创建一个名为 smtp_relay.py
的文件,该文件用于配置SMTP服务器的中继设置。在该文件中添加以下代码:
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
def send_email(to, subject, html_content, files=None):
msg = MIMEMultipart('alternative')
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = 'YOUR_SMTP_SENDER_EMAIL'
msg['To'] = to
part = MIMEText(html_content, 'html', 'utf-8')
msg.attach(part)
# Attach files
if files:
for file in files:
attachment = MIMEText(open(file, 'rb').read(), 'base64', 'utf-8')
attachment['Content-Disposition'] = 'attachment; filename="%s"' % file
msg.attach(attachment)
# Send email
smtp_host = 'YOUR_SMTP_HOST'
smtp_port = YOUR_SMTP_PORT
smtp_user = 'YOUR_SMTP_USERNAME'
smtp_password = 'YOUR_SMTP_PASSWORD'
smtp = smtplib.SMTP(smtp_host, smtp_port)
smtp.starttls()
smtp.login(smtp_user, smtp_password)
smtp.sendmail(msg['From'], to, msg.as_string())
smtp.quit()
将 YOUR_SMTP_SENDER_EMAIL
替换为用于发送电子邮件的发件人邮箱,将 YOUR_SMTP_HOST
、YOUR_SMTP_PORT
、YOUR_SMTP_USERNAME
和 YOUR_SMTP_PASSWORD
替换为之前在配置文件中设置的SMTP服务器的值。
保存并关闭 smtp_relay.py
文件。
现在你可以在Airflow的DAG文件中使用 send_email
函数来发送电子邮件。例如:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def send_notification_email():
send_email('recipient@example.com', 'Airflow Notification', 'This is a test email from Airflow.')
with DAG('email_example', start_date=datetime(2022, 1, 1)) as dag:
send_email_task = PythonOperator(
task_id='send_email_task',
python_callable=send_notification_email
)
在上面的示例中,当DAG运行时,send_notification_email
函数将被调用,并使用 send_email
函数发送电子邮件给指定的收件人。
这样,你就可以配置Airflow的邮件设置并在DAG中使用代码发送电子邮件了。记得根据你的实际情况修改配置和代码中的值。