要在Airflow DAG中当任务失败时发送自定义邮件,可以按照以下步骤进行操作:
首先,确保你已经设置了Airflow的邮件配置。你需要在Airflow的配置文件中配置SMTP服务器和收件人等相关信息。你可以在airflow.cfg
文件中找到这些配置项。配置文件位于$AIRFLOW_HOME/airflow.cfg
。
创建一个自定义的Operator,用于发送自定义邮件。你可以使用Python的smtplib
库来实现这个功能。以下是一个示例代码:
import smtplib
from email.mime.text import MIMEText
from airflow.operators.email_operator import EmailOperator
class CustomEmailOperator(EmailOperator):
def execute(self, context):
# 获取任务实例的状态
ti = context["task_instance"]
task_state = ti.current_state()
if task_state == "failed":
msg = MIMEText("任务失败了,请检查日志!")
msg["Subject"] = "任务失败通知"
msg["From"] = "sender@example.com"
msg["To"] = "recipient@example.com"
# 连接到SMTP服务器并发送邮件
smtp_server = smtplib.SMTP("smtp.example.com")
smtp_server.send_message(msg)
smtp_server.quit()
super().execute(context)
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
}
with DAG('custom_email_dag', default_args=default_args, schedule_interval=None) as dag:
task1 = BashOperator(
task_id='task1',
bash_command='echo "Task 1"',
)
task2 = BashOperator(
task_id='task2',
bash_command='exit 1',
)
task3 = CustomEmailOperator(
task_id='send_custom_email',
to='recipient@example.com',
subject='Custom Email',
html_content='This is a custom email.
',
)
task1 >> task2 >> task3
在这个示例中,当task2
失败时,send_custom_email
任务会发送一封自定义的邮件。
注意:在使用自定义的Operator之前,确保你已经安装了Airflow。