要使用Python和Apache Airflow在BigQuery中创建表并加载数据,可以按照以下步骤操作:
确保已安装Apache Airflow和Google Cloud SDK,并设置好相关的环境变量。
在Airflow的DAG目录中创建一个新的Python脚本文件,命名为create_and_load_table.py
。
在脚本中导入所需的模块和库:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import bigquery
from datetime import datetime
def create_and_load_table():
# 设置Google Cloud项目ID和BigQuery数据集名称
project_id = 'your-project-id'
dataset_name = 'your-dataset-name'
# 创建一个BigQuery客户端
client = bigquery.Client(project=project_id)
# 定义一个新的表模式
schema = [
bigquery.SchemaField("column1", "STRING"),
bigquery.SchemaField("column2", "INTEGER"),
bigquery.SchemaField("column3", "FLOAT"),
]
# 创建一个新的表
table_ref = client.dataset(dataset_name).table("your-table-name")
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table) # 创建表
# 加载数据到新表
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
job_config.autodetect = True
uri = "gs://your-bucket-name/your-data.csv" # 数据文件的GCS URI
load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
load_job.result() # 等待加载作业完成
print("Table created and data loaded successfully.")
dag = DAG(
dag_id='create_and_load_table',
start_date=datetime(2021, 1, 1),
schedule_interval=None
)
create_table_task = PythonOperator(
task_id='create_table_task',
python_callable=create_and_load_table,
dag=dag
)
create_table_task
airflow scheduler
。这样就使用Python和Apache Airflow成功创建了一个BigQuery表并加载数据。请确保替换代码中的相应参数,如项目ID、数据集名称、表名称和数据文件的GCS URI。