在Airflow PostgresHook中获取upsert行为通常需要在查询中使用ON CONFLICT子句。以下是一个示例:
from airflow.hooks.postgres_hook import PostgresHook
def upsert_table(): pg_hook = PostgresHook(postgres_conn_id='postgres_default')
# Set primary key and columns to update
id_col = 'id'
update_cols = ['col1', 'col2']
set_values = [f"{col}=origin.{col}" for col in update_cols]
set_values_str = ', '.join(set_values)
# Build the query
query = f"""
INSERT INTO my_table ({id_col}, {', '.join(update_cols)})
VALUES (%s, %s, %s)
ON CONFLICT ({id_col}) DO UPDATE SET {set_values_str}
"""
# Execute the query
pg_hook.run(query, parameters=(1, 'value1', 'value2'))
在查询中,您需要设置要更新的列和主键列。然后使用set_values_str来构建ON CONFLICT子句中的SET值字符串。最后,使用PostgresHook.run()方法执行查询。