这个错误通常是由于BigQuery的权限设置导致的。在Airflow中使用BigQueryHook时,确保你所用的GCP账户有BigQuery的管理权限。以下是正确设置BigQuery Hook的代码示例:
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import datetime
default_args = {
"owner": "airflow",
"start_date": datetime.datetime(2021, 1, 1)
}
dag = DAG("example_dag", default_args=default_args)
def create_bigquery_table():
# 这里替换为你的GCP项目ID
project_id = "your_project_id"
# 这里替换为你的BigQuery数据集ID
dataset_id = "your_dataset_id"
# 这里替换为你的BigQuery Table ID
table_id = "your_table_id"
sql_create_table = """
CREATE TABLE your_project_id.your_dataset_id.your_table_id (
column1 STRING,
column2 INTEGER,
column3 FLOAT
)
"""
bq_hook = BigQueryHook(use_legacy_sql=False, project_id=project_id)
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.execute(sql_create_table)
cursor.close()
conn.close()
create_bigquery_table_task = PythonOperator(task_id="create_bigquery_table", python_callable=create_bigquery_table, dag=dag)