AWS Glue的Custom Transform功能需要在转换脚本中明确指定摄取时间列,然后才能在输出模式中看到它。以下是指定摄取时间列的代码示例:
from awsglue.transforms import ApplyMapping, AddColumns, DropFields, RenameField
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from datetime import datetime
glueContext = GlueContext(SparkContext.getOrCreate())
# 读取数据源
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="my-db", table_name="my-table")
# 获取当前时间
utc_timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
# 定义摄取时间列
inputcols = [("col1", "string"), ("col2", "long")]
outputcols = [("col1", "string"), ("col2", "long"), ("ingestion_time", "string")]
# 添加摄取时间列
dfc0 = datasource0.toDF()
dfc1 = dfc0.withColumn("ingestion_time", lit(utc_timestamp))
datasource = DynamicFrame.fromDF(dfc1, glueContext, "datasource")
# 执行自定义转换
custom_transform = ApplyMapping.apply(frame=datasource, mappings=inputcols, transformation_ctx="custom_transform")
在以上代码示例中,使用了AddColumns和lit函数将当前时间添加到DynamicFrame,然后将其转换为DataFrame,最后使用DynamicFrame.fromDF将结果转换回DynamicFrame,并在输出模式中添加了ingestion_time列。
这样,就能在Custom Transform输出模式中看到ingestion_time列了。