可以使用Airflow的TimeDeltaSensor传感器来检测任务持续运行的时间,并配置一个回调函数来处理任务失败的情况。具体步骤如下:
from airflow.models import Variable
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
wait_time = Variable.get("wait_time")
sensor_timeout = Variable.get("sensor_timeout")
check_duration = TimeDeltaSensor(
task_id='check_duration',
delta=wait_time,
timeout=sensor_timeout,
poke_interval=60, #每过60秒检查一次
mode='reschedule',
dag=dag
)
def failure_callback(context):
failure_message = "Task {} failed because it ran for more than {}"
task_id = context['task_instance'].task_id
duration = wait_time
raise ValueError(failure_message.format(task_id, duration))
check_duration = TimeDeltaSensor(
task_id='check_duration',
delta=wait_time,
timeout=sensor_timeout,
poke_interval=60,
mode='reschedule',
on_failure_callback=failure_callback, #添加回调函数
dag=dag
)
这样,如果任务在10小时内没有完成,传感器将调用回调函数来使任务失败。
完整的Dag定义示例如下所示:
from datetime import datetime, timedelta
from airflow.models import DAG, Variable
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('example_dag', default_args=default_args, catchup=False, schedule_interval='@daily')
wait_time = Variable.get("wait_time")
sensor_timeout = Variable.get("sensor_timeout")
def failure_callback(context):
failure_message = "Task {} failed because it ran for more than {}"
task_id = context['task_instance'].task_id
duration = wait_time
raise ValueError(failure_message.format(task_id, duration))
check_duration = TimeDeltaSensor(
task_id='check_duration',
delta=wait_time,
timeout=sensor_timeout,
poke_interval=60,
mode='reschedule',
on_failure_callback=failure_callback,
dag=dag
)
example_task = PythonOperator(
task_id='example_task',
python_callable=example_function,
dag=dag
)
check_duration >> example_task