要使用Apache Airflow的BigQuery Operator执行任务,首先需要引入相关模块:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime
接下来,可以创建一个DAG并定义任务:
dag = DAG(
'bigquery_operator_example',
description='An example DAG to run BigQuery Operator',
schedule_interval='0 0 * * *',
start_date=datetime(2022, 1, 1),
catchup=False
)
task = BigQueryOperator(
task_id='bigquery_task',
sql='SELECT * FROM `your_project.your_dataset.your_table`',
destination_dataset_table='your_project.your_dataset.your_output_table',
write_disposition='WRITE_APPEND',
dag=dag
)
在上面的代码示例中,sql
参数指定了要执行的BigQuery查询语句。destination_dataset_table
参数指定了查询结果要写入的目标表。write_disposition
参数定义了写入目标表的方式,此处使用了WRITE_APPEND
表示追加写入。
最后,将任务添加到DAG中并设置任务之间的依赖关系:
task
完整的代码示例如下:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime
dag = DAG(
'bigquery_operator_example',
description='An example DAG to run BigQuery Operator',
schedule_interval='0 0 * * *',
start_date=datetime(2022, 1, 1),
catchup=False
)
task = BigQueryOperator(
task_id='bigquery_task',
sql='SELECT * FROM `your_project.your_dataset.your_table`',
destination_dataset_table='your_project.your_dataset.your_output_table',
write_disposition='WRITE_APPEND',
dag=dag
)
task
请根据实际情况替换your_project
、your_dataset
和your_table
为正确的项目、数据集和表名。另外,还可以根据需要设置其他参数,例如create_disposition
、use_legacy_sql
等。