Airflow的DAG中可以添加一个电子邮件任务,用于在任务失败时发送通知电子邮件。下面是示例代码:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, EmailOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 1),
'email_on_failure': True,
'email': ['airflow@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
def task1():
# do something
def task2():
# do something that may fail
def task3():
# do something
task_1 = PythonOperator(
task_id='task_1',
python_callable=task1,
dag=dag,
)
task_2 = PythonOperator(
task_id='task_2',
python_callable=task2,
dag=dag,
email_on_failure=True, # 设置任务失败后发送邮件通知
)
task_3 = PythonOperator(
task_id='task_3',
python_callable=task3,
dag=dag,
)
email_task = EmailOperator(
task_id='email_task',
to=['admin@example.com'],
subject='Airflow Task Failed',
html_content="""Airflow Task failed. Please check your DAG.
""",
dag=dag,
)
task_1 >> task_2 >> task_3 >> email_task
在这个DAG中,当任务2失败时,Airflow会自动发送电子邮件通知到指定的收件人地址。