您可以使用Python中的pandas
和pyarrow
库来处理Avro文件,并将日期列数据类型从long转换为timestamp类型,然后将其加载到BigQuery中。
下面是一个完整的代码示例:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from google.cloud import bigquery
# 读取Avro文件
avro_file = 'path/to/your/file.avro'
avro_table = pa.ipc.open_file(avro_file)
avro_data = avro_table.read_pandas()
# 将日期列数据类型从long转换为timestamp
avro_data['date_column'] = pd.to_datetime(avro_data['date_column'], unit='ms')
# 将数据保存为Parquet文件
parquet_file = 'path/to/your/file.parquet'
table = pa.Table.from_pandas(avro_data)
pq.write_table(table, parquet_file)
# 将Parquet文件加载到BigQuery中
bq_client = bigquery.Client()
dataset_id = 'your-dataset-id'
table_id = 'your-table-id'
table_ref = bq_client.dataset(dataset_id).table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
job_config.autodetect = True
with open(parquet_file, 'rb') as source_file:
job = bq_client.load_table_from_file(source_file, table_ref, job_config=job_config)
job.result() # 等待加载作业完成
print('数据已成功加载到BigQuery。')
请确保您已经安装了所需的Python库,并将path/to/your/file.avro
替换为您的Avro文件路径,path/to/your/file.parquet
替换为要保存的Parquet文件路径,your-dataset-id
和your-table-id
替换为您要加载数据的BigQuery数据集和表的ID。
这段代码将读取Avro文件,将日期列数据类型从long转换为timestamp,并将数据保存为Parquet文件。然后,它将使用BigQuery的Python客户端库将Parquet文件加载到BigQuery中。最后,它会等待加载作业完成并打印成功消息。