以下是一个使用Airflow连接到SQL Server,并将查询结果导入到数据帧中的示例代码:
from datetime import datetime
import pandas as pd
from airflow import DAG
from airflow.hooks.mssql_hook import MsSqlHook
from airflow.operators.python_operator import PythonOperator
def query_sql_server():
# 连接到SQL Server数据库
hook = MsSqlHook(mssql_conn_id='your_mssql_conn_id')
# 编写SQL查询语句
sql_query = "SELECT * FROM your_table"
# 执行SQL查询,并将结果存储在pandas数据帧中
df = hook.get_pandas_df(sql_query)
# 打印查询结果
print(df)
dag = DAG(
dag_id='sql_server_to_dataframe',
start_date=datetime(2022, 1, 1),
schedule_interval='@once'
)
task = PythonOperator(
task_id='query_sql_server',
python_callable=query_sql_server,
dag=dag
)
task
在上面的示例代码中,我们首先导入了所需的模块和类。然后定义了一个名为query_sql_server
的函数,该函数包含了连接到SQL Server数据库、执行查询并将结果存储在数据帧中的逻辑。
接下来,我们创建了一个DAG对象,并指定了其唯一的dag_id
、开始日期和调度间隔。
最后,我们创建了一个PythonOperator
任务,将query_sql_server
函数作为其可调用对象,并将其添加到DAG中。
请注意,上述代码中的mssql_conn_id
参数需要替换为您的SQL Server连接的ID。您可以在Airflow的连接页面(Admin -> Connections)中创建一个新的连接,并使用相应的ID替换your_mssql_conn_id
。
希望以上代码对您有所帮助!