Google Cloud Platform提供了BigQuery数据传输服务,它可以将数据从外部数据源移动到BigQuery中。在某些情况下,外部数据源中的日期或月份可能只包含一个数字,例如1月可能只是1。但是,BigQuery默认情况下不支持这种格式。
为了正确加载包含单个数字日期和月份的数据,我们需要使用特殊的格式和函数来解析并将其转换为标准格式。以下是一个例子,该例子使用Dataflow接收CSV文件,其中包含单个数字日期和月份,然后通过BigQuery转换数据并将其插入到新的BigQuery表中。
import apache_beam as beam
import csv
class FormatDates(beam.DoFn):
def process(self,element):
day,month,year = element.split("/")
if len(day) == 1:
day = "0" + day
if len(month) == 1:
month = "0" + month
element = "{}/{}/{}".format(day,month,year)
return [element]
def run():
PROJECT_ID = 'my-project'
BUCKET = 'my-bucket'
FILENAME = 'data.csv'
DATASET = 'my-dataset'
TABLE = 'my-table'
SCHEMA = 'date:DATE, value:INTEGER'
runner = 'DirectRunner'
options = {
"project": PROJECT_ID,
"staging_location": "gs://{}/staging/".format(BUCKET),
"temp_location": "gs://{}/temp/".format(BUCKET),
"runner": runner,
}
p = beam.Pipeline(options=options)
data = (p | "Read from CSV" >>
beam.io.ReadFromText("gs://{}/{}".format(BUCKET,FILENAME),
skip_header_lines = 1)
)
formatted = (data | "Format Dates" >>
beam.ParDo(FormatDates())
)
formatted_table = (formatted | "Write to BigQuery" >>
beam.io.gcp.bigquery.WriteToBigQuery(
table = "{}.{}".format(DATASET,TABLE),
schema = SCHEMA,
write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method="STREAMING_INSERTS",
project = PROJECT_ID
)
)
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
run()
在