在Airflow的DAG中,可以使用以下代码示例为失败和重试的任务定义不同的电子邮件列表:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.email import send_email
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 3
}
dag = DAG(
'example_dag',
default_args=default_args,
description='A simple DAG for demonstration purposes',
schedule_interval='0 0 * * *'
)
def send_email_on_failure(context):
subject = f'Airflow alert: {context["task_instance"].task_id}'
message = f'An error occurred in {context["task_instance"].task_id}'
send_email(
to=['fail@example.com'],
subject=subject,
html_content=message
)
run_this = BashOperator(
task_id='run_this',
bash_command='echo 1',
on_failure_callback=send_email_on_failure,
dag=dag
)
run_that = BashOperator(
task_id='run_that',
bash_command='echo 2',
dag=dag
)
run_this >> run_that
在这个示例中,我们定义了一个名为“send_email_on_failure”的Python函数,每当一个任务失败时,它都会被调用并向特定的电子邮件地址发送一条消息。然后,我们将这个函数分配给一个回调函数,它将在任务失败回调中调用。
在默认参数中,我们将“email_on_failure”和“email_on_retry”均设置为True,并为失败的任务指定了默认电子邮件列表。在任务级别上,我们还可以在每个任务的operator上使用“email”参数来指定电子邮件列表。
通过这种方式,我们就可以轻松地为Airflow中的失败和重试定义不同的电子邮件列表。