在进行数据流摄取时,应该在BigQuery Schema中使用STRUCT来定义Postgres数组的字段,并且使用REPEATED来表示数组中的元素。
以下是一个Python示例代码,演示如何在数据流摄取期间正确地定义Schema,以保留Postgres数组字段:
from google.cloud import bigquery
client = bigquery.Client()
# 构建Schema
schema = [
bigquery.SchemaField('id', 'STRING', mode='REQUIRED'),
bigquery.SchemaField('name', 'STRING', mode='REQUIRED'),
bigquery.SchemaField(
'array_field', 'STRUCT', mode='NULLABLE',
fields=[
bigquery.SchemaField('value', 'STRING', mode='REPEATED',
sub_fields=[
bigquery.SchemaField('sub_field', 'INTEGER', mode='REQUIRED'),
])
])
]
# 定义数据流摄取配置
stream_config = bigquery.StreamingInsertAllRequest(rows=[{
'id': '1',
'name': 'test',
'array_field': {
'value': [
{'sub_field': 1},
{'sub_field': 2},
{'sub_field': 3},
]
}
}],
skip_invalid_rows=False,
ignore_unknown_values=True)
# 执行数据流摄取
table_id = 'my_project.my_dataset.my_table'
table_ref = client.dataset('my_dataset').table('my_table')
streaming_buffer_size = 10 * 1024 * 1024 # 10MB
with client.stream_from(
table_ref,
schema=schema,
streaming_buffer_size=streaming_buffer_size) as stream:
stream.insert_rows(stream_config.rows)
在上面的示例代码中,用STRUCT和REPEATED定义了包含Postgres数组的字段。在执行数据流摄取时,使用了正确的Schema来保留该字段。