要在BigQuery中使用Array
from google.cloud import bigquery
# 创建BigQuery客户端
client = bigquery.Client()
# 定义表架构
schema = [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("string_array", "STRING", mode="REPEATED")
]
# 创建表
table_ref = client.create_table("project_id.dataset_id.example_table", schema)
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
class BuildArrayString(beam.DoFn):
def process(self, element):
array_string = ['string1', 'string2', 'string3']
yield bigquery.BigQueryTableRow(
id=element['id'],
string_array=array_string
)
# 创建Apache Beam管道
with beam.Pipeline() as p:
# 从某个数据源读取数据
data = p | beam.Create([{'id': '1'}, {'id': '2'}, {'id': '3'}])
# 转换数据并将其写入BigQuery
data | beam.ParDo(BuildArrayString()) | beam.io.WriteToBigQuery(
'project_id:dataset_id.example_table',
schema='id:STRING,string_array:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
上述代码中,BuildArrayString类继承自beam.DoFn,并用于构建包含Array
然后,我们使用ParDo转换将数据转换为具有Array
请注意,您需要将'project_id:dataset_id.example_table'替换为您实际的项目ID、数据集ID和表名。确保您的代码中已经设置了正确的身份验证凭据,以便访问BigQuery。