在 AWS IoT 规则中添加后置操作,将 IoT Core 的 MQTT 消息发布到 Timestream,同时使用 AWS Identity and Access Management (IAM) 角色授权 Timestream 接收消息。以下是示例代码:
import boto3
# 创建 Timestream 客户端
timestream = boto3.client('timestream-write')
# 创建数据库
timestream.create_database(DatabaseName='my_database')
# 创建表格
table = {
'DatabaseName': 'my_database',
'TableName': 'my_table',
'RetentionProperties': {
'MemoryStoreRetentionPeriodInHours': 24,
'MagneticStoreRetentionPeriodInDays': 7
},
'Tags': [
{'Key': 'env', 'Value': 'dev'}
]
}
timestream.create_table(**table)
import boto3
# 创建 IAM 客户端
iam = boto3.client('iam')
# 创建 IAM 角色
role = iam.create_role(
RoleName='iot-timestream-role',
AssumeRolePolicyDocument={
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Principal': {'Service': 'iot.amazonaws.com'},
'Action': 'sts:AssumeRole'
}]
}
)
# 授权给 Timestream
policy = {
'Version': '2012-10-17',
'Statement': [{
'Effect': 'Allow',
'Action': 'timestream:WriteRecords',
'Resource': f'arn:aws:timestream:{region}:{account_id}:database/my_database/table/my_table'
}]
}
iam.put_role_policy(
RoleName='iot-timestream-role',
PolicyName='timestream-policy',
PolicyDocument=json.dumps(policy)
)
SELECT *, topic() as topic, timestamp() as ts INTO 'my_database.my_table'
FROM 'iot_topic/#'