确保你的Google Cloud账户有足够的BigQuery权限。
在Airflow DAG中使用BigQueryOperator任务时,添加partition_field参数。
例如:
insert_data_to_partitioned_table = BigQueryOperator(
task_id='insert_data_to_partitioned_table',
sql='INSERT INTO `your_project.your_dataset.your_partitioned_table` (date, value) VALUES ("{{ds}}", {{task_instance.xcom_pull(task_ids='get_value')}})',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
allow_large_results=True,
params={'project': 'your_project'},
bigquery_conn_id='bigquery_default',
partition_field='date'
)
这里的partition_field指定了表的分区字段。
insert_data_to_partitioned_table = BigQueryOperator(
task_id='insert_data_to_partitioned_table',
sql='INSERT INTO `your_project.your_dataset.your_partitioned_table` (date, value) VALUES ("{{ds}}", {{task_instance.xcom_pull(task_ids='get_value')}})',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
allow_large_results=True,
params={'project': 'your_project'},
bigquery_conn_id='bigquery_default',
partition_field='date',
time_partitioning={
'type': 'DAY',
'field': 'date'
}
)
这里的time_partitioning指定了表的分区方式为按天分区。