首先,确保您已正确配置Airflow和Snowflake的相关内容。您可以尝试以下此代码示例对此问题进行处理:
在Snowflake安装包中,您需要安装以下组:
pip install snowflake-connector-python[pandas] pip install snowflake-connector-python[secure-local-storage]
安装完成后,在您Airflow的DAG文件中,您需要创建一个自定义operator并使用以下Python代码:
from airflow.operators.sensors import BaseSensorOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults from airflow import DAG, logging from airflow.contrib.hooks.snowflake_hook import SnowflakeHook from airflow.operators.snowflake_operator import SnowflakeOperator
class SnowflakeConnectionSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, snowflake_conn_id, *args, **kwargs):
self.snowflake_conn_id = snowflake_conn_id
super(SnowflakeConnectionSensor, self).__init__(*args, **kwargs)
def poke(self, context):
hook = SnowflakeHook(self.snowflake_conn_id)
success = False
try:
conn = hook.get_conn()
cursor = conn.cursor()
cursor.execute('SELECT 1')
result = cursor.fetchone()
if result and result[0] == 1:
success = True
except Exception as e:
logging.error(e)
return success
dag = DAG('snowflake_connection_test', default_args=default_args) snowflake_conn_id = 'my_snowflake_conn'
snowflake_connection_sensor = SnowflakeConnectionSensor( task_id='snowflake_connection_sensor_task', dag=dag, snowflake_conn_id=snowflake_conn_id )
snowflake_query_task = SnowflakeOperator( task_id='snowflake_query_task', dag=dag, sql='SELECT 1', snowflake_conn_id=snowflake_conn_id )
snowflake_connection_sensor >> snowflake_query_task
将以上代码添加到您的DAG文件后,再次尝试连接Airflow和Snowflake,问题应该就可以得到解决了。