在AWS Glue ETL作业中,可以使用以下代码示例来删除文件中的分区键:
以下是代码示例:
import boto3
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
s3 = boto3.client('s3', region_name='us-east-1')
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_bucket', 'input_prefix', 'output_bucket', 'output_prefix'])
input_bucket = args['input_bucket']
input_prefix = args['input_prefix']
output_bucket = args['output_bucket']
output_prefix = args['output_prefix']
# Get list of files in input S3 bucket and prefix
response = s3.list_objects_v2(Bucket=input_bucket, Prefix=input_prefix)
files = []
for obj in response['Contents']:
files.append(obj['Key'])
for file in files:
datasource0 = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [f"s3://{input_bucket}/{file}"]},
format="csv",
format_options={"header": True}
)
# Removing partition keys from DynamicFrame
data_without_partition_keys = datasource0.drop_fields(
['partitionColumn1', 'partitionColumn2'])
# Writing DynamicFrame back to S3 output bucket and prefix
glueContext.write_dynamic_frame.from_options(
frame=data_without_partition_keys,
connection_type="s3",
connection_options={"path": f"s3://{output_bucket}/{output_prefix}/{file}"},
format="csv",
format_options={"header": True}
)
print("ETL job finished")
此作业将从输入S3路径读取每个文件,并删除DynamicFrame中的分区键。接下来,作业将修改的DynamicFrame写回到输出S3路径。