这通常是由于默认情况下Airflow任务运行的超时时间导致的。在Docker容器或Kubernetes Pod环境下尤其常见。您可以使用以下代码将超时时间延长到您需要的时间:
from airflow.exceptions import AirflowTaskTimeout
from airflow.models.taskinstance import TaskInstance
from datetime import timedelta
def my_function(ds, **kwargs):
task_instance = TaskInstance(kwargs['ti'].task, kwargs['ti'].execution_date)
# 设置任务的超时时间为1小时(可按需更改)
task_instance.max_tries = 1
task_instance.try_number = 1
task_instance.executor_config = {'airflow.task_runner': {'max_tries': 1}}
task_instance.start_date = task_instance.current_dag_run.start_date
task_instance.end_date = task_instance.start_date + timedelta(hours=1)
try:
# 调用您的函数/任务
my_task_function()
except AirflowTaskTimeout as timeout_exception:
# 处理超时错误
raise timeout_exception
通过设置任务的max_tries
和try_number
属性,您可以使用executor_config
设置任务的超时时间。在这个例子中,我们设置了超时时间为1小时。如果任务在超时时间内仍未完成,会引发AirflowTaskTimeout
异常,您可以在这里处理该异常。