Airflow中的Jinja模板在解析文本时会进行两次解析,这会导致一些不必要的计算和延迟。例如,如果我们有一个任务从一个SQL查询中检索数据,我们希望将结果保存到GCS中,那么我们可能会有以下代码片段:
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
def my_function():
bq_hook = BigQueryHook()
gcs_hook = GoogleCloudStorageHook()
# Run the query and save the result to a GCS file
query = "SELECT * FROM my_table"
gcs_hook.upload(bucket="my_bucket", object="result.csv", data=bq_hook.get_pandas_df(query))
在调用get_pandas_df()
函数时,Airflow会首先解析Jinja模板,以确定查询是否包含模板变量。然后,在查询被传递到BigQueryHook的run_query()
函数中时,Airflow会再次解析Jinja模板,以根据可用的变量填充查询。
这种双重解析可能会导致任务延迟,并消耗更多的CPU资源。为了避免这种情况,我们可以通过将查询字符串传递给preprocess_query()
函数来进行一次手动的Jinja模板解析。这样,在get_pandas_df()
被调用时,查询字符串已经是解析过的,并且不再需要进行解析。我们可以像下面这样来实现:
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from jinja2 import Environment, BaseLoader
def my_function():
bq_hook = BigQueryHook()
gcs_hook = GoogleCloudStorageHook()
# Run the query and save the result to a GCS file
query = Environment(loader=BaseLoader).from_string("SELECT * FROM my_table").render()
gcs_hook.upload(bucket="my_bucket", object="result.csv", data=bq_hook.get_pandas_df(query))
在这个例子中,我们手动将查询字符串传递给Jinja模板,并使用render()
函数将其解析。现在,当我们将查询传递给get_pandas_df()
函数时,我们不再需要解析Jinja模板,从而减少