AWS Glue Schema Registry 支持在同一 Kafka 主题中使用多个模式。我们需要在 AWS Glue 管理台上为每个模式创建注册表,并在 Glue Job 中指定注册表以及适用的模式。
以下是一个 Python 脚本示例,演示如何在同一 Kafka 主题中使用两个不同的模式:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# 读取 Kafka 主题中的数据,使用第一个模式解析
source = glueContext.create_dynamic_frame_from_options(
"kafka", {
"topics": "my_topic",
"kafka.bootstrap.servers": "my-kafka-broker:9092",
"schema.registry.url": "http://my-schema-registry:8081",
"schema.registry.topic": "my_topic",
"schema.registry.schema.name": "my_first_schema"
}
)
# 使用第二个模式解析数据
mapped = ApplyMapping.apply(frame=source,
mappings=[("field1", "string", "new_field1"),
("field2", "string", "new_field2")],
transformation_ctx="mapped")
sink = glueContext.getSink(
"kafka",
{
"bootstrap.servers": "my-kafka-broker:9092",
"schema.registry.url": "http://my-schema-registry:8081",
"schema.registry.topic": "my_topic",
"schema.registry.schema.name": "my_second_schema"
},
transformation_ctx="sink"
)
# 将第二个模式解析后的数据写回到 Kafka 主题
sink.writeFrame(mapped)
在上面的示例中,我们使用第一个模式解析 Kafka 主题中的数据,然后使用第二个模式将转换后的数据写