在 Airflow DAG 中使用 PythonOperator,并将 XCom 数据作为参数传递给 SQL 查询。在 SQL 查询中使用 cast() 函数将双引号字符串转换为单引号字符串。示例代码如下:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.snowflake_operator import SnowflakeOperator
from datetime import datetime
def xcom_to_snowflake_query(**context):
xcom_data = context['ti'].xcom_pull(key='my_key')
snowflake_query = "SELECT column FROM my_table WHERE column = " + cast(xcom_data as varchar) + ";"
return snowflake_query
dag = DAG('my_dag', start_date=datetime(2021, 1, 1))
task1 = PythonOperator(
task_id='my_task',
provide_context=True,
python_callable=xcom_to_snowflake_query,
dag=dag)
task2 = SnowflakeOperator(
task_id='my_query',
sql=task1.output,
dag=dag)
在上述代码中,xcom_to_snowflake_query()
函数从 XCom 中检索数据,并将其转换为用于 Snowflake 查询的单引号字符串。任务 task1
是一个 PythonOperator,用于运行 xcom_to_snowflake_query()
函数。任务 task2
使用 SnowflakeOperator 运行从 task1
传递的 SQL 查询。