在 Airflow 的 DAG 文件中,我们可以使用 SnowflakeOperator
来连接 Snowflake 并执行相关操作。该操作器的构造函数中包含参数 session_parameters
,可以用来传递 Snowflake 的配置参数。
例如,以下代码展示了如何使用 SnowflakeOperator
来连接 Snowflake,并使用 session_parameters
来传递 QUERY_TAG
配置参数。
from datetime import datetime
from airflow import DAG
from airflow.operators.snowflake import SnowflakeOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'retries': 1,
}
with DAG('my_snowflake_dag', default_args=default_args, schedule_interval=None) as dag:
snowflake_task = SnowflakeOperator(
task_id='my_snowflake_query',
sql='SELECT * FROM my_table',
snowflake_conn_id='my_snowflake_conn',
session_parameters={
'QUERY_TAG': 'my_tag'
}
)
在上述代码中,我们创建了一个名为 my_snowflake_dag
的 DAG,并使用 SnowflakeOperator 来执行一个 SQL 查询操作。我们将 snowflake_conn_id
设置为我们在 Airflow 中配置的 Snowflake 连接 ID,而 session_parameters
中包含一个 QUERY_TAG
配置参数,其值为 'my_tag'
。
由于我们将该配置参数传递给了 SnowflakeOperator
,因此该查询操作将会携带 'my_tag'
标记,并在 Snowflake 控制台的查询历史记录中体现。