在Airflow中使用BigQueryOperator操作符可以实现将一个BQ表复制到另一个BQ表。以下是一个示例代码:
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('copy_bq_table', default_args=default_args, schedule_interval='@once')
copy_table_task = BigQueryOperator(
task_id='copy_table',
sql='CREATE OR REPLACE TABLE `project.dataset.destination_table` AS SELECT * FROM `project.dataset.source_table`',
use_legacy_sql=False,
destination_dataset_table='project.dataset.destination_table',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bigquery_default',
dag=dag
)
copy_table_task
在这个示例中,我们创建了一个DAG,并定义了一个BigQueryOperator任务(copy_table_task)。在任务中,我们使用了CREATE OR REPLACE TABLE语句,将源表(source_table)的内容复制到目标表(destination_table)。我们还指定了一些参数,如use_legacy_sql=False,destination_dataset_table='project.dataset.destination_table',write_disposition='WRITE_TRUNCATE'等。这些参数可以根据具体需求进行调整。
请注意,你需要根据你的项目设置正确的BigQuery连接(bigquery_conn_id)和相关的表名。