你可以使用jinja模板来从dag_run.conf中获取值。
首先,在你的ECSOperator中,定义一个参数 template_fields,将 'template_fields = ('task_definition', 'overrides')' 添加到ECSOperator。
如下所示:
from airflow.contrib.operators.ecs_operator import ECSOperator
class MyECSOperator(ECSOperator):
    template_fields = ('task_definition', 'overrides')
然后,在您的任务定义(task definition)字段中,通过使用 {{ dag_run.conf }} 来访问 dag_run.conf 中的值。
如下所示:
task_definition = """
{
    "family": "example-task",
    "containerDefinitions": [
        {
            "image": "your-docker-image",
            "name": "example-container",
            "overrides": {{ overrides | to_json }},
            "environment": [
              {
                  "name": "MY_ENV_VAR",
                  "value": "{{ dag_run.conf['MY_ENV_VAR'] }}"
              }
            ]
        }
    ]
}
"""
如果您的 dag_run.conf 看起来像这样:
{
    "MY_ENV_VAR": "example value"
}
那么通过以上代码,在运行 EcsOperator 时, MY_ENV_VAR 的值将替换为 example value。