在 Airflow BashOperator 中使用 kubernetes_pod_operator 将 KubernetesPodOperator 的实例化更改为 Pod k8s API 对象的角色可以与 BashOperator 使用不同的 ServiceAccount 品质。以下是一个示例:
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 6, 9),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'kubernetes_pod_operator_example',
default_args=default_args,
schedule_interval=None,
)
with dag:
k = KubernetesPodOperator(
task_id='task1',
name='task1',
namespace='default',
image='busybox',
cmds=["sh", "-c", "echo $ROLE"],
env_vars={'ROLE': 'test'},
service_account_name='test-service-account',
in_cluster=True,
config_file='/etc/kubernetes/admin.conf',
dag=dag,
)
在这个例子中,当 BashOperator 执行时,它将使用函数中定义的 env_vars。而 kubernetes_pod_operator 将使用设置的 ServiceAccount 品质。可以使用这种方法来更好地控制在每个任务中使用的服务帐户。