近期,Airflow社群发现在Airflow 2.0.1版中,使用KubernetesExecutor时,Pod模板覆盖未按预期工作。具体而言,通过设置dag的default_args参数中的'kube_pod_template_file”属性以覆盖默认Pod模板文件时,KubernetesExecutor未能使用新的模板文件,而是继续使用默认的模板文件。
为了解决这个问题,我们可以使用'pod_override”hook。这个hook是在Executor创建Pod对象时调用的。我们只需要在我们的dag文件中使用这个hook来覆盖默认设置。下面是一个例子:
from airflow.models.baseoperator import BaseOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
class MyKubernetesPodOperator(KubernetesPodOperator):
def execute(self, context):
# Load new pod template here
self.pod = self.create_new_pod()
# Call the parent execute function
return super().execute(context)
class MyOperator(BaseOperator):
def execute(self, context):
# Define pod template here
new_pod_template = {...}
# Create the operator and override the pod
operator = MyKubernetesPodOperator(
task_id='...',
name='...',
namespace='...',
image='...',
cmds=[...],
arguments=[...],
pod_template_file=None,
pod_override=new_pod_template,
dag=dag
)
operator.execute(context)
return 'done'
这个例子中,我们创建了一个自定义的KubernetesPodOperator,覆盖了它的execute()方法来加载新的Pod模板并创建任务所需的Pod对象。然后,我们在我们的dag中使用新的自定义操作符(MyOperator),使用自定义的Pod模板运行任务。这保证了Pod模板覆盖按预期工作,而且可以根据需要使用自定义的Pod模板。