如果使用AWS Glue Upsert作为数据目标,可能会遇到以下错误:
An error occurred while calling o199.pyWriteDynamicFrame. org.apache.spark.SparkException: Job aborted due to stage failure: Task 256 in stage 11.0 failed 4 times, most recent failure: Lost task 256.3 in stage 11.0 (TID 28195, ip-x-x-x-x.ec2.internal, executor 64): com.amazonaws.services.s3.model.AmazonS3Exception: IncompleteRequest: The request was aborted because it contains a null byte in the payload either before or after the XML data. For more information, see the MinIO logs.
为了修复这个问题,需要更新AWS Glue Job的Python脚本,以使用AWS Glue下游的S3 Target替代Upsert Target。请按照以下步骤操作:
1.打开AWS Glue Job的Python脚本。 2.在导入文本中添加以下代码:
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
job_name = args['JOB_NAME']
# 函数将数据写入S3并返回新的DynamicFrame
def write_to_s3(dynamic_frame, file_format, output_location):
glue_context.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={
"path": output_location,
"compression": "gzip",
"partitionKeys": []
},
format=file_format
)
return DynamicFrame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={
"path": output_location,
"partitionKeys": []
},
format=file_format
)
df3 = DynamicFrame.fromDF(df_merged_to_1, glueContext, "nested")
#将整合的数据写入S3。
write_to_s3(df3, args["file_format"], "s3://my-s3-bucket/path/to/output/")