在Airflow中,可以使用BigQueryToPostgresOperator
来将BigQuery中的数据导入到PostgreSQL数据库中。以下是一个示例代码,其中提供了项目ID(project_id):
from airflow import DAG
from airflow.contrib.operators.bigquery_to_postgres import BigQueryToPostgresOperator
from datetime import datetime
default_args = {
'start_date': datetime(2022, 1, 1),
}
dag = DAG('bigquery_to_postgres_example', default_args=default_args, schedule_interval=None)
bq_to_pg_operator = BigQueryToPostgresOperator(
task_id='bigquery_to_postgres_task',
sql='SELECT * FROM `bigquery_dataset.bigquery_table`',
destination_table='postgres_table',
project_id='your_project_id',
write_disposition='WRITE_TRUNCATE',
postgres_conn_id='postgres_connection',
dag=dag
)
bq_to_pg_operator
在上面的代码中,我们创建了一个BigQueryToPostgresOperator
任务,其中指定了BigQuery中的SQL查询(sql
),PostgreSQL中的目标表名(destination_table
),项目ID(project_id
),写入方式(write_disposition
,在这里设置为WRITE_TRUNCATE
,表示每次运行任务时先清空目标表再写入数据),以及PostgreSQL连接(postgres_conn_id
,需要在Airflow的连接配置中预先设置好)。
请将your_project_id
替换为您的实际项目ID,并根据需要修改其他参数。