在AWS中正确集成PostgreSQL和Kinesis的方式是使用AWS Database Migration Service (DMS) 和AWS Lambda。
下面是一个基本的代码示例,展示了如何使用AWS Lambda将PostgreSQL数据流式传输到Kinesis流。
import psycopg2
import json
import boto3
def lambda_handler(event, context):
# 连接到PostgreSQL数据库
conn = psycopg2.connect(
host='',
port=,
database='',
user='',
password=''
)
# 创建一个Kinesis客户端
kinesis = boto3.client('kinesis')
# 执行PostgreSQL查询
cursor = conn.cursor()
cursor.execute('SELECT * FROM ')
# 获取查询结果
rows = cursor.fetchall()
# 将查询结果逐行发送到Kinesis流中
for row in rows:
data = {
'column1': row[0],
'column2': row[1],
# 根据需要添加更多的列
}
# 将数据转换为JSON格式
json_data = json.dumps(data)
# 将数据发送到Kinesis流
response = kinesis.put_record(
StreamName='',
PartitionKey='1', # 可以根据需要修改分区键
Data=json_data
)
# 关闭数据库连接
conn.close()
return {
'statusCode': 200,
'body': 'Data streamed to Kinesis successfully'
}
请注意,上述代码示例仅为基本示例,您需要根据实际情况进行修改和调整。确保替换代码中的占位符(如
此外,您还需要确保已正确设置AWS Lambda函数和Kinesis流的权限,并在AWS Lambda的触发器设置中设置适当的触发器(例如,按计划或按需触发)。