Airflow提供了BigQueryOperator,可以在Airflow中使用该操作符来执行BigQuery的任务。下面是一个示例,展示如何使用BigQueryOperator将一个表复制到另一个表:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime
# 定义DAG的默认参数
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# 定义DAG
dag = DAG('copy_table_to_another_table',
default_args=default_args,
schedule_interval='@once')
# 定义BigQueryOperator任务
copy_table_task = BigQueryOperator(
task_id='copy_table',
sql='''INSERT INTO `project.dataset.table2`
SELECT *
FROM `project.dataset.table1`''',
use_legacy_sql=False,
destination_dataset_table='project.dataset.table2',
write_disposition='WRITE_TRUNCATE', # 如果表已存在,则会清空并重新写入
dag=dag
)
# 设置任务的依赖关系
copy_table_task
在上述示例中,我们创建了一个名为copy_table_to_another_table
的DAG,其中包含一个名为copy_table
的任务。该任务使用BigQueryOperator执行一个SQL查询,将project.dataset.table1
的数据复制到project.dataset.table2
中。
在BigQueryOperator中,我们设置了以下参数:
sql
:要执行的SQL查询。use_legacy_sql
:指定是否使用传统SQL语法。在这个例子中,我们将其设置为False,表示使用标准SQL语法。destination_dataset_table
:目标表的名称,格式为project.dataset.table
。write_disposition
:写入操作的设置。在这个例子中,我们将其设置为WRITE_TRUNCATE
,表示如果表已存在,则会清空并重新写入。请注意,你需要根据自己的实际情况修改project.dataset.table1
和project.dataset.table2
的值。
你可以在Airflow中运行这个DAG,它将使用BigQueryOperator将一个表复制到另一个表。