可能的原因是在Kubernetes集群中正确配置了Airflow Worker Pod,但Airflow Web-UI无法访问Worker Pod状态。 需要确保Worker Pod的IP地址和端口在Web-UI中可用。
可以尝试使用以下代码解决此问题:
使用kubectl命令获取Worker Pod的IP地址和端口号。
kubectl get pods -o wide
在Web-UI中添加Worker Pod的IP地址和端口号。
# Create airflow.cfg
def create_airflow_cfg(namespace, worker_pod_name, worker_port):
with open("airflow.cfg", 'w') as f:
f.write(f"""
[core]
executors = KubernetesExecutor
sql_alchemy_conn = mysql://root:airflow@airflow-mysql.{namespace}.svc.cluster.local:3306/airflow
[kubernetes]
in_cluster = True
namespace = {namespace}
[operator_defaults]
namespace = {namespace}
[kubernetes_node_selectors]
pool_default_node_selector = airflow-worker={worker_pod_name}
[kubernetes_pod_template]
name = airflow-tenant
in_cluster = True
termination_grace_period = 180
image_pull_secrets = b64_aW1hZ2VzL3Rlc3Q=
airflow_configmap = airflow-tenant-config
cpu_request = 500m
cpu_limit = 1000m
ram_request = 1Gi
ram_limit = 2Gi
volume_mounts = '{{"name": "postgres-data", "mountPath": "{}"}}\n{{"name": "dag-repo","mountPath": "{}"}}'.format(dag_volume_mount, dag_volume_mount)
[webserver]
dag_default_view = graph
enable_xcom_debugger = True
base_url = /{namespace}/airflow/
web_server_host = 0.0.0.0
web_server_port = 8080
worker_refresh_batch_size = 1
[worker]
do_xcom_push = True
base_url = http://{worker_pod_name}:{worker_port}
""")
if __name__ == '__main__':
create_airflow_cfg('tenant-1', 'airflow-worker-1', 8793)
airflow_home = "/opt/airflow"
cmd = f"airflow webserver -p 8080"
proc = subprocess.Popen(cmd.split(), cwd=airflow_home)
print(f"Airflow webserver started with pid {proc.pid}")
该代码将在Airflow Web-UI中添加Worker Pod的IP地址和端口号,并确保Web-UI可以