出现该问题的原因是AWS Glue作业使用create_dynamic_frame.from_catalog时无法正确解析表中的分区投影信息,因此无法访问表中的数据。
为了解决这个问题,可以使用create_dynamic_frame.from_options方法来手动指定分区信息,而不是使用create_dynamic_frame.from_catalog方法。具体示例代码如下:
import boto3 import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext from pyspark.sql.context import SQLContext from pyspark.sql.types import * from awsglue.context import GlueContext
args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args)
s3 = boto3.client('s3') tableName = "sample_table"
response = s3.list_objects_v2(Bucket='my_bucket', Prefix='my_prefix/'+tableName+'/') partitions = [] for item in response['Contents']: partitions.append(item['Key'][len('my_prefix/'+tableName+'/'):])
values = {} for partition in partitions: values[partition.split("=")[0]] = partition.split("=")[1]
dynamic_frame = glueContext.create_dynamic_frame.from_options( connection_type="s3", connection_options={ "path": "s3://my_bucket/my_prefix/sample_table/", "partitionKeys": list(values.keys()) }, format="csv", format_options={ "header": True, "inferSchema": True } )
job.commit()