你可以使用jinja模板来从dag_run.conf中获取值。
首先,在你的ECSOperator中,定义一个参数 template_fields
,将 'template_fields = ('task_definition', 'overrides')' 添加到ECSOperator。
如下所示:
from airflow.contrib.operators.ecs_operator import ECSOperator
class MyECSOperator(ECSOperator):
template_fields = ('task_definition', 'overrides')
然后,在您的任务定义(task definition)字段中,通过使用 {{ dag_run.conf }}
来访问 dag_run.conf
中的值。
如下所示:
task_definition = """
{
"family": "example-task",
"containerDefinitions": [
{
"image": "your-docker-image",
"name": "example-container",
"overrides": {{ overrides | to_json }},
"environment": [
{
"name": "MY_ENV_VAR",
"value": "{{ dag_run.conf['MY_ENV_VAR'] }}"
}
]
}
]
}
"""
如果您的 dag_run.conf
看起来像这样:
{
"MY_ENV_VAR": "example value"
}
那么通过以上代码,在运行 EcsOperator 时, MY_ENV_VAR
的值将替换为 example value
。