如果JSON数据超过了AWS Kinesis Firehose的put_record限制(1MB),可以使用put_record_batch API发送多个记录。以下是代码示例:
import boto3 import json
client = boto3.client('firehose', region_name='us-east-1')
def upload_records(records): response = client.put_record_batch( DeliveryStreamName='my_delivery_stream_name', Records=records ) failed_put_count = response['FailedPutCount'] if failed_put_count > 0: failed_records = [] for idx, res in enumerate(response['RequestResponses']): if 'ErrorCode' in res: failed_records.append(records[idx]) # 处理失败的记录 handle_failed_records(failed_records)
def handle_failed_records(failed_records): # 确定如何处理失败的记录 pass
data = {} records = [] for record in list_of_records: data.update(record) record_bytes = json.dumps(data).encode('utf-8') if len(record_bytes) > 1000000: # 记录是太大了,将其发送并开始新的记录 upload_records(records) records = [record_bytes] data = record else: # 记录添加到记录列表中 records.append(record_bytes) upload_records(records) # 发送剩余的记录
请注意,put_record_batch API有一个可配置的RecordBatchSize字段,可以使用它来改变批量传送的大小。但是,您需要小心,因为调整大小可能会影响吞吐量。