根据官方文档,Airflow在EKS中并不需要Triggerer Pod。相反,可以使用KubernetesExecutor,并将DAG的任务分配给Kubernetes Pod来执行。以下是示例代码:
在KubernetesExecutor模式下,您将需要一个Kubernetes集群进行任务运行。此处先创建Kubernetes Operator和对应的命名空间:
kubectl create namespace airflow
kubectl apply -f https://github.com/teamclairvoyant/kubernetes-operators/archive/airflow-operator-v1.10.15.yaml
kubectl apply -f https://github.com/teamclairvoyant/kubernetes-operators/blob/main/examples/airflow-rbac.yaml
kubectl apply -f https://github.com/teamclairvoyant/kubernetes-operators/blob/main/examples/airflow.yaml
接下来,您需要在您的DAG定义中包含一个KubernetesPodOperator
任务:
from datetime import timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'k8s_dag',
default_args=default_args,
description='A basic example DAG to run a k8s Pod',
schedule_interval=timedelta(days=1),
)
k8s_task = KubernetesPodOperator(
namespace='default',
image="airflow",
cmds=["python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="airflow-test-pod",
dag=dag,
is_delete_operator_pod=True,
hostnetwork=False,
)
运行此DAG后,Airflow将创建Kubernetes Pod并执行任务。您可以在Airflow UI中查看任务的状态和日志。
需要注意的是,使用KubernetesExecutor模式需要在Airflow容器中安装对应的Kubernetes Python客户端库,以便与K