要使用Airflow将PostgreSQL表导出到CSV文件中,您可以使用PostgreSQLOperator和PythonOperator来完成该任务。以下是一个示例解决方案:
首先,您需要安装所需的依赖项。您可以使用以下命令安装Airflow和PostgreSQL库:
pip install apache-airflow
pip install psycopg2
接下来,您可以创建一个DAG文件,例如export_postgresql_table.py
,并将以下代码放入该文件中:
from datetime import datetime
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
import psycopg2
import csv
# 连接到PostgreSQL数据库
def connect_postgresql():
conn = psycopg2.connect(
host='',
database='',
user='',
password=''
)
return conn
# 使用COPY导出PostgreSQL表
def export_postgresql_table():
conn = connect_postgresql()
cursor = conn.cursor()
# 查询表中的数据
cursor.execute("SELECT * FROM ")
rows = cursor.fetchall()
# 将数据写入CSV文件
with open('/path/to/export.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(rows)
cursor.close()
conn.close()
print("Table exported successfully!")
# 定义DAG
dag = DAG(
'export_postgresql_table',
description='Export PostgreSQL table to CSV',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False
)
# 导出表的任务
export_task = PythonOperator(
task_id='export_table',
python_callable=export_postgresql_table,
dag=dag
)
# 设置依赖关系
export_task
在上面的代码中,您需要将
,
,
,
和
替换为您的实际值。您还需要将/path/to/export.csv
替换为您希望导出到的实际CSV文件路径。
接下来,使用以下命令启动Airflow Web服务器和调度程序:
airflow webserver -p 8080
airflow scheduler
最后,使用以下命令将DAG文件导入Airflow并运行任务:
airflow dags unpause export_postgresql_table
airflow dags trigger export_postgresql_table
完成后,您将在指定的CSV文件路径下找到导出的PostgreSQL表数据。