可能问题的根源是在Glue中定义的表模式与数据源中实际存在的模式不匹配。为了解决这个问题,你可以使用代码来指定输入源和输出表的模式。下面是一个源于GitHub上的代码示例:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
# Here is where you specify the input source and output S3 path
input_job_path = "s3://my-bucket/input/"
output_job_path = "s3://my-bucket/output/"
input_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("value", DoubleType(), True)])
# Read data from S3
dynamic_frame = glueContext.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={
"path": input_job_path,
"recurse":True},
format_options={
"withHeader": True,
"separator": ","},
schema=input_schema)
# Convert dynamic frame to data frame
df = dynamic_frame.toDF()
# Apply transformations to data frame
df_transformed =
output_schema = StructType([
StructField("new_id", IntegerType(), True),
StructField("first_name", StringType(), True),
StructField("last_name", StringType(), True),
StructField("final_value", DoubleType(), True)])
# Write back to S3 with new schema
df_transformed.write.mode("overwrite").format("parquet").option("compression", "snappy").save(output_job_path)
# Glue Catalog metadata for new table schema
spark.catalog.refreshTable("default.new_table_name")
df_output = spark.read.format("parquet").load(output_job_path)
# Rename columns and convert to Glue dynamic frame
df_output_transformed = df_output