在 Lambda 函数处理程序中添加以下代码示例来解决该问题:
import base64
def lambda_handler(event, context):
for record in event['Records']:
payload = record['kafka']['payload']
headers = record['kafka']['headers']
for h in headers:
if h['key'] == 'message-id':
message_id = h['value']
break
payload_bytes = base64.b64decode(payload)
# Use message_id and payload_bytes for further processing
在上面的代码中,我们首先使用 base64 解码 Kafka 消息体,然后迭代 Kafka 消息头以获取消息 ID,并使用它来进一步处理消息。注意,在消费 Kafka 消息时,您应该始终使用字节数组而不是字符串。