要将存储和查询分开的项目,可以使用以下解决方法:
from google.cloud import storage
def create_bucket(bucket_name):
"""Creates a new bucket."""
storage_client = storage.Client()
bucket = storage_client.create_bucket(bucket_name)
print('Bucket {} created'.format(bucket.name))
create_bucket('my-bucket')
from google.cloud import storage
def upload_file(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print('File {} uploaded to {}.'.format(
source_file_name,
destination_blob_name))
upload_file('my-bucket', 'data.csv', 'data/data.csv')
from google.cloud import bigquery
def create_dataset(dataset_name):
"""Creates a new dataset."""
bigquery_client = bigquery.Client()
dataset = bigquery_client.create_dataset(dataset_name)
print('Dataset {} created'.format(dataset.dataset_id))
create_dataset('my-dataset')
from google.cloud import bigquery
def load_data_from_gcs(dataset_name, table_name, source_uri):
"""Loads data from GCS to BigQuery."""
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.autodetect = True
load_job = bigquery_client.load_table_from_uri(
source_uri, table_ref, job_config=job_config
)
load_job.result()
print('Data loaded from {} to {}.{}'.format(
source_uri,
dataset_name,
table_name
))
load_data_from_gcs('my-dataset', 'my-table', 'gs://my-bucket/data/data.csv')
from google.cloud import bigquery
def run_query(dataset_name, query):
"""Runs a query in BigQuery."""
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset_name)
query_job = bigquery_client.query(query, location='US', job_config=bigquery.QueryJobConfig(
destination=dataset_ref.table('query_results'), write_disposition="WRITE_TRUNCATE"
))
query_job.result()
print('Query results saved to {}.query_results'.format(dataset_name))
run_query('my-dataset', 'SELECT * FROM `my-dataset.my-table`')
通过以上代码示例,可以实现将存储和查询分开的项目。首先,将数据上传到GCS存储桶中,然后将数据从GCS存储桶导入到BigQuery数据集中。最后,执行查询操作并将查询结果保存到BigQuery数据集中。