为了保证 Bigquery 流式传输的成功,需要将数据分成较小的批次进行传输。以下是使用 Python 的示例代码:
from google.cloud import bigquery
# Initialize a client object
client = bigquery.Client()
# Define the target dataset and table
table_id = "myproject.mydataset.mytable"
# Define the rows of data to insert
rows_to_insert = [
{"column1": "value1", "column2": "value2"},
{"column1": "value3", "column2": "value4"},
# More rows...
]
# Define the size of batches to insert
batch_size = 1000
# Insert the rows in batches
for i in range(0, len(rows_to_insert), batch_size):
batch = rows_to_insert[i:i+batch_size]
errors = client.insert_rows_json(table_id, batch)
if errors:
print("Errors occurred while inserting rows: {}".format(errors))
else:
print("Rows inserted successfully.")
通过将数据分成大小为 1000 的批次进行传输,可以避免 Bigquery 在流式传输时无法加载全部数据的问题。