可以使用 KubernetesExecutor 来指定超时时间,以便 Airflow 知道何时应将任务标记为失败。具体的代码示例如下:
在 dag 的默认参数中增加超时时间:
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=1),
'execution_timeout': timedelta(hours=2) # 这里增加了超时时间
}
在 airflow.cfg 中配置 executor:
executor = KubernetesExecutor
kubernetes_image =
worker_container_image =
kubeconfig_file =
如果需要更改 KubernetesExecutor 的默认值,可以在 airflow.cfg 中进行配置:
[kubernetes]
image_pull_secrets =
airflow_home = /usr/local/airflow
git_sync_root = /usr/local/airflow/var/git
dags_in_image = True
namespace = default
kube_config = /path/to/kube/config
in_cluster = False
config_file = /path/to/kube/config
delete_worker_pods = False
cmd_kube_client = kubectl