- 导入所需Python库:
import boto3
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from awsglue.dynamicframe import DynamicFrame
- 配置AWS Glue作业参数:
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glueContext = GlueContext(SparkContext.getOrCreate())
job = glueContext.create_job(args['JOB_NAME'], args)
- 定义源文件路径和目标文件路径:
source_path = "s3://source-bucket/path/to/parquet/files/"
destination_path = "s3://destination-bucket/path/to/parquet/files/"
- 读取数据:
in_dyf = glueContext.create_dynamic_frame_from_options(
"parquet", {"paths": [source_path]}, "my_source")
- 构建转换器:
applymapping1 = ApplyMapping.apply(frame=in_dyf, mappings=[
# Mapping column names in source to column names in destination
("field_name_in_source", "string", "field_name_in_destination", "string"),
("field_name_in_source2", "string", "field_name_in_destination2", "string")
], transformation_ctx="applymapping1")
- 转换数据类型:
cast3 = applymapping1.toDF().withColumn(
"numeric_column_name", col("numeric_column_name").cast("decimal(10,2)"))
- 写入数据:
out_dyf = DynamicFrame.fromDF(cast3, glueContext, "out_dyf")
glueContext.write_dynamic_frame.from_options(frame=out_dyf, connection_type="s3",
connection_options={"path":destination_path},
format="parquet",
transformation_ctx="out")