要将Airflow与SQLAlchemy连接到元数据数据库,并设置短寿命连接,可以按照以下步骤进行操作:
安装所需的库:
pip install apache-airflow
pip install sqlalchemy
创建一个Airflow DAG文件,例如example_dag.py
,并添加以下代码:
from airflow import DAG
from airflow.models import Variable
from airflow.utils.dates import days_ago
from sqlalchemy import create_engine
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}
dag = DAG(
'example_dag',
default_args=default_args,
schedule_interval='@daily',
)
def connect_to_database():
# 从Airflow变量中获取数据库连接信息
db_username = Variable.get("db_username")
db_password = Variable.get("db_password")
db_host = Variable.get("db_host")
db_port = Variable.get("db_port")
db_name = Variable.get("db_name")
# 创建数据库连接字符串
connection_string = f"postgresql://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}"
# 使用SQLAlchemy创建数据库连接引擎
engine = create_engine(connection_string)
# 在此处执行需要连接到数据库的操作
# ...
# 关闭数据库连接
engine.dispose()
# 创建一个任务
connect_to_database_task = PythonOperator(
task_id='connect_to_database_task',
python_callable=connect_to_database,
dag=dag,
)
airflow.cfg
中配置元数据数据库连接信息。找到以下部分,并根据你的数据库配置进行修改:[core]
...
sql_alchemy_conn = postgresql+psycopg2://:@:/
...
在Airflow的Web界面中创建所需的变量。在“Admin” -> “Variables”页面中,创建以下变量(名称和值根据你的数据库配置进行修改):
启动Airflow调度程序:
airflow scheduler
airflow dags trigger example_dag
通过这种方式,你可以在Airflow中使用SQLAlchemy连接到元数据数据库,并且确保连接的生命周期较短。