要在Airflow中使用PostgresOperator执行SQL文件,可以使用以下解决方法:
import os
dag_folder = os.path.dirname(os.path.realpath(__file__))
open
函数打开SQL文件并读取其中的SQL语句。可以使用以下代码示例:sql_file_path = os.path.join(dag_folder, 'sql_file.sql')
with open(sql_file_path, 'r') as file:
sql_query = file.read()
from airflow.operators.postgres_operator import PostgresOperator
task = PostgresOperator(
task_id='execute_sql',
sql=sql_query,
postgres_conn_id='postgres_default',
autocommit=True,
dag=dag
)
在上述代码中,sql_query
是从SQL文件中读取的SQL语句。postgres_conn_id
是Airflow配置中定义的PostgreSQL连接的ID。autocommit=True
表示在执行SQL语句后自动提交事务。
确保将上述代码放在DAG文件中,以便正确执行SQL文件中的查询。