可能原因是在Python SDK中,不支持将BigQuery表作为Pub/Sub主题直接使用。解决方法是,使用Dataflow的Streaming Engine,利用自定义的 PubsubToBigQuery 类将Pub/Sub消息写入到BigQuery表中。
以下是一个示例代码:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class PubsubToBigQuery(beam.DoFn):
def __init__(self,dataset,table):
self.dataset = dataset
self.table = table
def start_bundle(self):
self.bigquery_client = bigquery.Client()
self.dataset_ref = self.bigquery_client.dataset(self.dataset)
self.table_ref = self.dataset_ref.table(self.table)
def process(self, element, timestamp=beam.DoFn.TimestampParam):
try:
row = {
"column1": element['column1'],
"column2": element['column2'],
"timestamp": timestamp
}
errors = self.bigquery_client.insert_rows_json(self.table_ref, [row], ignore_unknown_values=True)
if errors:
raise Exception(errors)
else:
return [element['column1']]
except Exception as e:
return []
def run():
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
(p | "Read from Pub/Sub" >> beam.io.ReadFromPubSub(subscription="projects/{project}/subscriptions/{subscription}")
| "Transform message" >> beam.Map(lambda x: json.loads(x))
| "Write to BigQuery" >> beam.ParDo(PubsubToBigQuery(dataset='my_dataset', table='my_table')))
p.run().wait_until_finish()
在此示例中,Pub/Sub消息将从指定的订阅中读取,然后将每条消息转换为JSON,并将其传递给 PubsubToBigQuery 类。类将转换后的消息写入到指定的 BigQuery 表中。在表格结构的之后,您