在 Airflow 中,如果您的 DAG 包含 Jinja 模板,那么在 DAG 运行时,这些模板的内容将被解析和替换。在某些情况下,可能会发现您的模板似乎没有被正确解析,这可能是由于变量未正确解析引起的。
要处理这种情况,您可以尝试使用以下代码片段:
from airflow.models import DAG
from airflow.utils.dates import days_ago
default_args = {
"owner": "airflow",
"start_date": days_ago(1),
}
with DAG(
dag_id="example_dag",
default_args=default_args,
schedule_interval="@daily",
) as dag:
# 将您的 Jinja 模板逐个传递给每个任务
task1 = BashOperator(
task_id="task1",
bash_command='echo "{{ ds }}"', # 示例模板内容
dag=dag,
)
task2 = BashOperator(
task_id="task2",
bash_command='echo "{{ ds }}"', # 示例模板内容
dag=dag,
)
# 确保在所有任务中使用模板时使用正确的模板上下文
dag.doc_md = """
## 测试DAG
"""
dag.template_env.globals.update(my_variable="Hello World")
在这个示例中,我们将 jinja syntax 的 {{ ds }} 传递给了两个BashOperator任务。此外,更新了模板变量上下文(my_variable)和DAG文档。
您可以使用类似的代码片段来解决您的问题。