为了解决这个问题,我们可以使用BigQuery批量插入操作来替代流式插入操作。以下是示例代码:
from google.cloud import bigquery
from google.oauth2 import service_account
# 配置您的密钥文件路径
credentials = service_account.Credentials.from_service_account_file(
'path/to/your/service/account/key.json'
)
# 初始化 BigQuery客户端
client = bigquery.Client(credentials=credentials, project=credentials.project_id)
# 设置您的目标表的名称和架构
table_id = 'your-project.your_dataset.your_table'
schema = [
bigquery.SchemaField('name', 'STRING'),
bigquery.SchemaField('age', 'INTEGER'),
# (您可以添加更多字段,具体取决于您的表架构)
]
# 批量插入数据
rows_to_insert = [
('Alice', 23),
('Bob', 45),
('Charlie', 67),
# (您可以添加更多行,具体取决于您的数据量)
]
errors = client.insert_rows(table_id, rows_to_insert, schema=schema)
# 检查错误列表 (如果有)
if errors == []:
print('数据已成功插入表格。')
else:
print(f'发生错误: {errors}')
使用BigQuery批量插入操作,可以显著提高插入大量数据的速度和效率,从而更好地应对高负载情况。