确认您的Google Cloud账户已经授予了适当的权限,以让您的Airflow DAG能够对该数据集进行查询。
在您的代码中提供Google Cloud账户密钥或者使用项目默认的Google Cloud服务账户。
确保BigQueryOperator的参数project_id设置正确,与您的Google Cloud项目ID匹配。
示例代码:
import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
# 按照您的要求定义DAG
dag = DAG('my_dag', description='Query bigquery-public-dataset',
schedule_interval='0 0 * * *',
start_date=datetime.datetime(2019, 1, 1),
catchup=False)
# BigQueryOperator 构造器的第一个参数是任务名称,第二个参数是用于查询的SQL语句。
# 您可以使用“project_id”和“use_legacy_sql”等其他参数,具体根据您的要求进行调整。
# 这里以bigquery-public-dataset.SAMPLES.TRAIN为例,该数据集是公共数据集中的一个例子。
bq_operator = BigQueryOperator(
task_id='my_bq_task',
sql='SELECT name, year, num FROM `bigquery-public-data.samples.natality` LIMIT 10;',
destination_dataset_table='my_destination_table',
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
dag=dag)
# 在DAG中添加BigQueryOperator任务
bq_operator