AWS Glue Job JDBC数据源书签值修改
在AWS Glue Job中,我们可以在connection.getConnection()
中使用SparkSession配置JDBC数据源来读取和写入数据。然而,由于某些限制,JDBC数据源可能无法处理从现有的位置开始读取的情况,因此需要在每个步骤之间保留书签值,以避免数据重复读取或丢失。在AWS Glue中,可以使用标准引擎的类glueetlbookmarks.jar
和AWS内部库aws-glue-libs-for-scala
来进行书签管理。
以下是在AWS Glue Job中针对JDBC数据源的书签值修改代码示例:
# 导入所需的库和类
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
import sys, traceback
from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, ArrayType, TimestampType, DateType
# 从命令行中获取参数
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'DB_JDBC_URL',
'DB_TABLE',
'CONNECTION_OPTIONS',
'BOOKMARK_BUCKET',
'BOOKMARK_PREFIX'
])
# 初始化GlueContext和SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
# 从数据源读取数据
df = spark.read.format('jdbc').options(
url=args['DB_JDBC_URL'],
dbtable=args['DB_TABLE'],
driver='com.mysql.jdbc.Driver',
**args['CONNECTION_OPTIONS']).load()
# 在此处添加转换或处理数据的代码
# 创建DynamicFrame并将其写入目标
targetDf = DynamicFrame.fromDF(df, glueContext, 'targetDf')
glueContext.write_dynamic_frame.from_options(
frame=targetDf,
connection_type='s3',
connection_options={
'path': args['S3_TARGET_PATH']
},
format='parquet',
transformation_ctx='transform'
)
# 在此处更新书签值
bookmark = df.agg({'id': 'max'}).collect()[0][0]
glue_bookmark = glueContext.extract_bookmark()
if glue_bookmark:
if bookmark >= glue_bookmark:
glueContext.update_bookmark({'id': bookmark})
else:
glueContext.write_bookmark({'id': bookmark})
在此代码中,我们使用getResolvedOptions
获取AWS Glue Job传入的命令行参数,并初始化了GlueContext
和SparkContext
。接着,我们使用Spark JDBC连接器读取数据源,创建DynamicFrame,并使用write_dynamic_frame.from_options
将DynamicFrame写入S3桶中。最后,我们使用df.agg({'id': 'max'}).collect()[0][0]
获取数据源中最大的ID,并使用GlueContext
API更新书签值。
这是针对AWS Glue Job中JDBC数据源书签管理的一种解决方案,我们可以使用extract_bookmark
、write_bookmark
和update_bookmark
API管理书签值,并避免数据的重复读取或丢失。