是的,Big Query数据传输服务在使用composer时需要使用服务账户进行认证。具体步骤如下:
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
create_table = BigQueryCreateEmptyTableOperator(
task_id='create_table',
project_id='my_project',
dataset_id='my_dataset',
table_id='my_table',
schema_fields=[
{'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
{'name': 'name', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'age', 'type': 'INTEGER', 'mode': 'REQUIRED'}
],
gcp_conn_id='my_gcp_connection',
google_cloud_storage_conn_id='my_gcs_connection',
location='US'
)
create_table.run(
start_date=airflow.utils.dates.days_ago(1),
end_date=airflow.utils.dates.days_ago(1),
ignore_ti_state=True
)
其中,gcp_conn_id和google_cloud_storage_conn_id参数需要替换为您自己的连接ID,location参数需要根据您所在的地理位置进行设置。
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/service_account_key.json
将路径替换为您自己的密钥文件路径即可。