此错误通常表示AWS Firehose无法从输入数据中提取元数据,进而导致分区失败。可能的解决方法是检查数据格式是否正确,确保所有必需的元数据都存在并且格式正确。具体来说,可以使用以下代码示例来确保传输的JSON数据格式正确且包含必需的元数据:
import json
import boto3
from datetime import datetime
firehose = boto3.client('firehose')
def lambda_handler(event, context):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
data = {
"id": event.get("id", "NULL"),
"name": event.get("name", "NULL"),
"timestamp": timestamp
}
try:
response = firehose.put_record(
DeliveryStreamName='my-delivery-stream',
Record={
'Data': json.dumps(data) + '\n'
}
)
print(response)
except Exception as e:
print(e)
raise e
在此示例中,我们使用Python客户端将数据传输到名为“my-delivery-stream”的AWS Firehose交付流。在lambda_handler函数中,我们首先使用datetime库生成当前时间戳,并将必需的数据放入一个Python字典中。我们随后将Python字典转换为JSON字符串,并通过put_record函数将其发送到AWS Firehose。如果在传输过程中出现任何错误,我们会将其打印并抛出异常。通过这种方式,我们可以确保我们传输的JSON数据格式正确且包含必需的元数据,从而避免DynamicPartitioning.MetadataExtractionFailed错误。