如果在Airflow 1.10.15中使用了运行DB函数的Hooks,则这些Hooks必须从DBApiHook继承。可以采用以下代码示例解决该问题:
from airflow.hooks.dbapi_hook import DbApiHook
class CustomHook(DbApiHook):
def __init__(self, conn_id):
super().__init__(conn_id)
def run_my_db_function(self, param1, param2):
with self.get_conn() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM my_table WHERE col1 = %s AND col2 = %s", (param1, param2))
result = cur.fetchall()
return result
在上述示例中,自定义的Hook类CustomHook从DbApiHook继承,并使用get_conn()方法获得数据库连接以运行DB函数。此类还包括一个名为run_my_db_function()的示例DB函数,该函数将参数传递给查询,以检索与给定列值匹配的行。