这可能是由于凭据问题所致。您可以通过更新您的Google Cloud SDK安装包并重新验证您的凭据来解决这个问题。如果更新SDK后问题仍然存在,则可以将服务帐号密钥文件传递给Beam管道以进行身份验证。以下是一个示例代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
project_id = 'your_project_id'
key_file = 'path/to/your/key_file.json'
table_spec = 'project_id:dataset_id.table_id'
schema = 'column1:STRING,column2:INTEGER'
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.service_account = key_file
p = beam.Pipeline(options=options)
data = [{'column1': 'value1', 'column2': 1},
{'column1': 'value2', 'column2': 2}]
(p | beam.Create(data)
| beam.io.WriteToBigQuery(table_spec,
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
p.run().wait_until_finish()
在上述示例中,您需要将“your_project_id”替换为您的Google Cloud项目ID,“path/to/your/key_file.json”替换为您的服务帐号密钥文件的路径,“project_id:dataset_id.table_id”替换为您的表格规范,以及“column1:STRING,column2:INTEGER”替换为您的表格架构。此外,您还需要根据实际情况调整表的创建和写入行为。