在AWS Glue中,可以使用Job Bookmark功能来记住作业的执行进度,并在需要时恢复进度。默认情况下,每个作业只能有一个书签。但是,我们可以通过在代码中手动控制书签来实现在同一作业中创建多个书签。具体实现方法如下:
在AWS Glue作业代码中,使用“glue_context.get_job_bookmark()”获取当前作业的书签,然后传递给“extractor”和“transformer”函数或类作为参数。在数据提取和转换过程中,我们可以使用此书签来跟踪当前的执行进度,并在执行完成后更新书签。
以下是一个简单的示例,用于在同一AWS Glue作业中创建名为“bookmark1”和“bookmark2”的两个书签:
import sys
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glueContext = GlueContext(SparkContext.getOrCreate())
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 获取当前作业的书签
bookmark1 = glueContext.get_job_bookmark('bookmark1')
bookmark2 = glueContext.get_job_bookmark('bookmark2')
# 如果书签为空,则从头开始执行
if bookmark1 is None:
bookmark1 = '0'
if bookmark2 is None:
bookmark2 = '0'
# 执行数据提取和转换,使用书签跟踪进度
df1 = glueContext.create_dynamic_frame.from_catalog(database="my_database", table_name="my_table1", transformation_ctx="df1", bookmark=bookmark1)
df2 = glueContext.create_dynamic_frame.from_catalog(database="my_database", table_name="my_table2", transformation_ctx="df2", bookmark=bookmark2)
# 更新书签
glueContext.set_job_bookmark('bookmark1', df1.last_record['timestamp'])
glueContext.set_job_bookmark('bookmark2', df2.last_record['timestamp'])
# 执行数据输出
glueContext.write_dynamic_frame.from_options(frame=df1, connection_type="s3", connection_options={"path": "s3://my_bucket/output1"}, format="parquet")
glueContext.write_dynamic_frame.from_options(frame=df2, connection_type="s3", connection_options={"path": "s3://my_bucket/output2"}, format="json")
job.commit()
``