可以尝试手动添加Snowflake连接器的依赖项,以防止在作业启动时无法下载。下面是添加该依赖项的示例代码:
import sys
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
glue_context = GlueContext(SparkContext.getOrCreate())
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext.getOrCreate()
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "")
# 添加Snowflake连接器依赖项
sc._jvm.com.databricks.addons.SnowflakeInitiator().initializeSparkSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
dynamic_frame = DynamicFrame.from_options(connection_type="snowflake", connection_options={
"sfUrl": "",
"sfUser": "",
"sfPassword": "",
"sfDatabase": "",
"sfSchema": "",
"sfWarehouse": ""
}, transformation_ctx="snowflake_output")
glue_context.write_dynamic_frame.from_options(frame = dynamic_frame, connection_type="s3", connection_options={
"path": "s3://",
"partitionKeys": [""]
}, format = "csv", transformation_ctx="s3_output")
job.commit()
在代码中,使用以下行添加Snowflake连接器依赖项:
sc._jvm.com.databricks.addons.SnowflakeInitiator().initializeSparkSession(sc._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
此外,还需要在代码中使用包含AWS凭证的密钥对访问S3。 在您的代码中使用您自己的访问凭证。