AWSGlueJobbookmarkvaluemodificationforJDBCsources
创始人
2024-09-25 15:31:22
0

AWS Glue Job JDBC数据源书签值修改

在AWS Glue Job中,我们可以在connection.getConnection()中使用SparkSession配置JDBC数据源来读取和写入数据。然而,由于某些限制,JDBC数据源可能无法处理从现有的位置开始读取的情况,因此需要在每个步骤之间保留书签值,以避免数据重复读取或丢失。在AWS Glue中,可以使用标准引擎的类glueetlbookmarks.jar和AWS内部库aws-glue-libs-for-scala来进行书签管理。

以下是在AWS Glue Job中针对JDBC数据源的书签值修改代码示例:

# 导入所需的库和类
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions

import sys, traceback

from pyspark.context import SparkContext
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType, ArrayType, TimestampType, DateType

# 从命令行中获取参数
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'DB_JDBC_URL',
    'DB_TABLE',
    'CONNECTION_OPTIONS',
    'BOOKMARK_BUCKET',
    'BOOKMARK_PREFIX'
])

# 初始化GlueContext和SparkContext
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session

# 从数据源读取数据
df = spark.read.format('jdbc').options(
        url=args['DB_JDBC_URL'],
        dbtable=args['DB_TABLE'],
        driver='com.mysql.jdbc.Driver',
        **args['CONNECTION_OPTIONS']).load()

# 在此处添加转换或处理数据的代码

# 创建DynamicFrame并将其写入目标
targetDf = DynamicFrame.fromDF(df, glueContext, 'targetDf')

glueContext.write_dynamic_frame.from_options(
    frame=targetDf,
    connection_type='s3',
    connection_options={
        'path': args['S3_TARGET_PATH']
    },
    format='parquet',
    transformation_ctx='transform'
)

# 在此处更新书签值
bookmark = df.agg({'id': 'max'}).collect()[0][0]
glue_bookmark = glueContext.extract_bookmark()
if glue_bookmark:
    if bookmark >= glue_bookmark:
        glueContext.update_bookmark({'id': bookmark})
else:
    glueContext.write_bookmark({'id': bookmark})

在此代码中,我们使用getResolvedOptions获取AWS Glue Job传入的命令行参数,并初始化了GlueContextSparkContext。接着,我们使用Spark JDBC连接器读取数据源,创建DynamicFrame,并使用write_dynamic_frame.from_options将DynamicFrame写入S3桶中。最后,我们使用df.agg({'id': 'max'}).collect()[0][0]获取数据源中最大的ID,并使用GlueContext API更新书签值。

这是针对AWS Glue Job中JDBC数据源书签管理的一种解决方案,我们可以使用extract_bookmarkwrite_bookmarkupdate_bookmark API管理书签值,并避免数据的重复读取或丢失。

相关内容

热门资讯

安卓换鸿蒙系统会卡吗,体验流畅... 最近手机圈可是热闹非凡呢!不少安卓用户都在议论纷纷,说鸿蒙系统要来啦!那么,安卓手机换上鸿蒙系统后,...
app安卓系统登录不了,解锁登... 最近是不是你也遇到了这样的烦恼:手机里那个心爱的APP,突然就登录不上了?别急,让我来帮你一步步排查...
安卓系统拦截短信在哪,安卓系统... 你是不是也遇到了这种情况:手机里突然冒出了很多垃圾短信,烦不胜烦?别急,今天就来教你怎么在安卓系统里...
安卓系统要维护多久,安卓系统维... 你有没有想过,你的安卓手机里那个陪伴你度过了无数日夜的安卓系统,它究竟要陪伴你多久呢?这个问题,估计...
windows官网系统多少钱 Windows官网系统价格一览:了解正版Windows的购买成本Windows 11官方价格解析微软...
安卓系统如何卸载app,轻松掌... 手机里的App越来越多,是不是感觉内存不够用了?别急,今天就来教你怎么轻松卸载安卓系统里的App,让...
怎么复制照片安卓系统,操作步骤... 亲爱的手机控们,是不是有时候想把自己的手机照片分享给朋友,或者备份到电脑上呢?别急,今天就来教你怎么...
安卓系统应用怎么重装,安卓应用... 手机里的安卓应用突然罢工了,是不是让你头疼不已?别急,今天就来手把手教你如何重装安卓系统应用,让你的...
iwatch怎么连接安卓系统,... 你有没有想过,那款时尚又实用的iWatch,竟然只能和iPhone好上好?别急,今天就来给你揭秘,怎...
iphone系统与安卓系统更新... 最近是不是你也遇到了这样的烦恼?手机更新系统总是失败,急得你团团转。别急,今天就来给你揭秘为什么iP...