要使用Airflow从数据库中获取数据并将其打印出来,你需要遵循以下步骤:
安装所需的库:
pip install apache-airflow
pip install sqlalchemy
创建一个DAG文件(例如,get_data_dag.py
),并导入所需的库:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from sqlalchemy import create_engine
# 定义DAG参数
default_args = {
'owner': 'airflow',
'start_date': datetime(2022, 1, 1),
}
# 创建DAG对象
dag = DAG(
'get_data_dag',
default_args=default_args,
schedule_interval='@daily',
)
创建一个Python函数,该函数将从数据库中获取数据并将其打印出来:
def get_data_from_db():
# 创建数据库连接
engine = create_engine('YOUR_DATABASE_CONNECTION_STRING')
# 执行SQL查询
query = 'SELECT * FROM your_table'
result = engine.execute(query)
# 打印查询结果
for row in result:
print(row)
创建一个PythonOperator,将上述函数作为其参数,并将其添加到DAG中:
task = PythonOperator(
task_id='get_data_task',
python_callable=get_data_from_db,
dag=dag,
)
运行Airflow的调度程序,以执行该DAG:
airflow scheduler
可以通过以下命令检查DAG的运行日志:
airflow logs -dag_id get_data_dag
这样,每天调度一次的get_data_task
任务将从数据库中获取数据并将其打印出来。请确保将YOUR_DATABASE_CONNECTION_STRING
替换为数据库的连接字符串。