可以通过以下方式设置Airflow BigQueryInsertJobOperator的配置:
在Airflow DAG文件中导入所需的库:
from airflow.contrib.operators.bigquery_operator import BigQueryInsertJobOperator
from airflow.models import DAG
from datetime import datetime
设置BigQuery插入作业的配置:
bq_insert = BigQueryInsertJobOperator(
task_id='mytask',
provide_context=True,
configuration={
'query': {
'query': 'SELECT id, name, age FROM mytable',
'destinationTable': {
'projectId': 'myproject',
'datasetId': 'mydataset',
'tableId': 'mytable_new',
},
'createDisposition': 'CREATE_IF_NEEDED',
'writeDisposition': 'WRITE_TRUNCATE',
},
},
dag=my_dag,
)
其中,'query'是必需的部分,包含要执行的查询和目标表的描述。'createDisposition'和'writeDisposition'选项用于控制当目标表不存在或有重复行时如何处理数据。
将task添加到DAG中并设置其他参数,如开始时间、计划等。
需要注意的是,此方法仅适用于Airflow v1.10.10及更高版本。如果Airflow版本过低,请升级到最新版本。