在Airflow中,使用f字符串获取xcom值会导致Jinja未定义错误。为了避免此错误,可以将f字符串转换为格式化字符串。这可以通过在f字符串前面添加“%”来完成。例如,要获取来自xcom的任务ID并将其插入SQL查询中,请使用以下代码:
from airflow.operators.postgres_operator import PostgresOperator
task_id = '{{ ti.xcom_pull(task_ids="my_task_id") }}'
sql = "SELECT * FROM my_table WHERE id=%s"
postgres_task = PostgresOperator(
task_id='my_postgres_task',
sql=sql % task_id,
postgres_conn_id='my_postgres_conn',
autocommit=True
)
在这个例子中,我们把f字符串 %s
转换为格式化字符串,使格式化字符串与Jinja无关,从而避免了未定义错误。