要使用Airflow和pandas的read_sql_query()函数以及commit函数,可以按照以下步骤操作:
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from sqlalchemy import create_engine
dag = DAG(
dag_id='sql_task',
description='A simple DAG that reads data from SQL and commits it',
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False
)
sql_query = "SELECT * FROM your_table" # 指定你的SQL查询语句
database_url = "your_database_url" # 指定你的数据库连接URL
def read_sql_and_commit():
# 创建数据库引擎
engine = create_engine(database_url)
# 读取SQL数据到pandas DataFrame
df = pd.read_sql_query(sql_query, engine)
# 在这里可以对DataFrame进行数据处理或其他操作
# 提交数据到数据库
df.to_sql("new_table", engine, if_exists='replace', index=False) # 将数据写入新的表new_table
# 创建PythonOperator任务
task = PythonOperator(
task_id='read_sql_and_commit',
python_callable=read_sql_and_commit,
dag=dag
)
task
这样,当你执行DAG时,它将运行read_sql_and_commit()函数,读取SQL数据并将其提交到数据库中。
注意:在使用pandas的to_sql()函数提交数据时,可以选择设置if_exists参数为'replace'、'append'或'fail',以指定如果表已经存在时的行为。