import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import BigQuerySink
class User():
def __init__(self, id, name, email):
self.id = id
self.name = name
self.email = email
class UserPipeline():
PROJECT_ID = 'your-project-id'
DATASET_ID = 'your-dataset-id'
TABLE_NAME = 'users'
def __init__(self, pipeline_args=None):
self.pipeline_options = PipelineOptions(pipeline_args)
self.pipeline = beam.Pipeline(options=self.pipeline_options)
def run(self, users):
(
self.pipeline
| "Create Beam PCollection" >> beam.Create(users)
| "Write to BigQuery" >> BigQuerySink(
project=self.PROJECT_ID,
dataset=self.DATASET_ID,
table=self.TABLE_NAME,
schema="id:INTEGER,name:STRING,email:STRING"
)
)
self.pipeline.run().wait_until_finish()
if __name__ == "__main__":
users = [
User(id=1, name="John", email="john@example.com"),
User(id=2, name="Jane", email="jane@example.com")
]
UserPipeline().run(users)
注意,此示例中引用的表j具有id,name和email三列,并且在BigQuery中已经正确