可以通过使用 Airflow 的 hooks 和 sensors,实现在连续 2 次任务失败时进行通知。
以下是一个示例代码:
from airflow.sensors.base import BaseSensorOperator
from airflow.hooks.base_hook import BaseHook
from typing import Optional, Tuple
class CustomSensor(BaseSensorOperator):
def __init__(self,
task_id: str,
conn_id: str,
num_failures: int,
*args,
**kwargs) -> None:
super().__init__(task_id=task_id, *args, **kwargs)
self.conn_id = conn_id
self.num_failures = num_failures
def poke(self, context) -> bool:
hook = BaseHook.get_hook(conn_id=self.conn_id)
result = hook.get_results()
if len(result) > self.num_failures:
return True
return False
from airflow import DAG
from datetime import datetime, timedelta
from custom_plugins import CustomSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email': ['youremail@yourdomain.com']
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG('my_dag', default_args=default_args, schedule_interval=timedelta(days=1)) as dag:
task_a = BashOperator(task_id='task_a', bash_command='echo 1')
task_b = BashOperator(task_id='task_b', bash_command='echo 2')
check_task = CustomSensor(
task_id='check_task',
num_failures=2,
conn_id='my_conn',
poke_interval=timedelta(minutes=1)
)
notify_task = EmailOperator(
task_id='notify_task',
subject='Two consecutive task failures',
to='youremail@yourdomain