您可以使用 BigQuery 定时查询来解决这个问题。以下是一个解决方案的代码示例:
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer
def create_scheduled_query(project_id, dataset_id, query, destination_dataset_project_id, destination_dataset_id):
client = bigquery_datatransfer.DataTransferServiceClient()
transfer_config = bigquery_datatransfer.TransferConfig(
destination_dataset_id=destination_dataset_id,
display_name='Scheduled query transfer config',
data_source_id='scheduled_query',
params={
'query': query,
'destination_table_name_template': 'table_{run_time|"%Y%m%d"}',
'write_disposition': 'WRITE_TRUNCATE',
'partitioning_field': '',
'destination_dataset_table': f"{destination_dataset_project_id}.{destination_dataset_id}"
},
schedule='every 24 hours'
)
parent = f"projects/{project_id}/locations/US"
transfer_config = client.create_transfer_config(parent=parent, transfer_config=transfer_config)
print(f"Created transfer config: {transfer_config.name}")
if __name__ == "__main__":
project_id = "YOUR_PROJECT_ID"
dataset_id = "SOURCE_DATASET_ID"
query = "SELECT * FROM `YOUR_PROJECT_ID.YOUR_DATASET_ID.YOUR_TABLE_ID`"
destination_dataset_project_id = "DESTINATION_PROJECT_ID"
destination_dataset_id = "DESTINATION_DATASET_ID"
create_scheduled_query(project_id, dataset_id, query, destination_dataset_project_id, destination_dataset_id)
请将以下值替换为您自己的项目和数据集信息:
此代码示例使用 bigquery_datatransfer
库来创建一个定时查询的传输配置。在 params
中,您可以设置查询、目标表的命名模板、写入行为和分区字段。在 schedule
中,您可以设置定时查询运行的计划。
请确保您的项目已经启用了 BigQuery 数据传输服务,并且您具有适当的权限来创建传输配置。