首先,在Airflow的DAG文件中添加以下代码,以配置Snowflake连接并连接到其数据仓库:
from airflow.hooks.base_hook import BaseHook from airflow.contrib.hooks.snowflake_hook import SnowflakeHook from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
def create_table(): hook = SnowflakeHook(snowflake_conn_id='snowflake_conn') conn = hook.get_conn() cursor = conn.cursor()
create_table_command = """
CREATE TABLE IF NOT EXISTS my_table (
id INTEGER,
name VARCHAR(25),
date DATE
);
"""
cursor.execute(create_table_command)
cursor.close()
conn.commit()
conn.close()
然后,在您的DAG中添加一个Snowflake Operator,以运行此函数:
create_table_task = SnowflakeOperator( task_id='create_table', sql='SELECT 1', snowflake_conn_id='snowflake_conn', do_xcom_push=False, dag=dag )
最后,将此任务添加到您的DAG中的其他任务中,以确保数据库表已正确创建。