AWS Glue Spark作业书签可以帮助我们跟踪Spark作业的状态,并在需要时恢复失败的作业。但是,它并不会自动重新处理失败的作业。如果您想要重新处理失败的作业,您需要修改您的Spark作业代码以支持重新处理失败的作业,并将完成的作业状态保存到您的AWS Glue作业书签中。以下是一个Python Spark作业示例,其中包含重新处理失败作业的代码:
from pyspark.sql import SparkSession
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
from awsglue import DynamicFrame
from awsglue.context import GlueContext
from awsglue.jobbookmark import JobBookmark
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
job_bookmark = JobBookmark(glueContext)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# 自定义函数,用于重新处理失败的行
def process_row(row):
# do something to process the row
return row
# 获取上一个作业的结束位置
last_end_pos = job_bookmark.get()
# 用DynamicFrame读取上一个作业的输出数据
input_data = glueContext.create_dynamic_frame.from_options(connection_type='s3', connection_options={'path': input_path}, format='csv', format_options={'separator': ',', 'header': True}, transformation_ctx='input_data')
# 检查DynamicFrame是否分区
is_partitioned = input_data.is_partitioned()
# 如果之前的作业有结束位置,则从此处开始处理
if last_end_pos:
start_pos = last_end_pos + 1
input_data = input_data.filter(lambda row: row['__record_index'] >= start_pos)
# 将DynamicFrame转换为DataFrame以进行处理
df = input_data.toDF()
# 对每行执行自定义处理函数以重新处理失败的行
processed_df = df.rdd.map(process_row).toDF()
#