可以通过以下示例代码,使用 AWS Python SDK (Boto3) 来检查设备是否在线。
import boto3
import time
iot = boto3.client('iot')
def get_device_status(device_id):
response = iot.describe_endpoint(endpointType='iot:Data-ATS')
endpoint = response['endpointAddress']
client_id = device_id + str(time.time())
mqtt = boto3.client('iot-data', endpoint_url=endpoint)
try:
response = mqtt.publish(
topic='$aws/things/{}/shadow/get'.format(device_id),
qos=1,
payload='{}')
response = mqtt.subscribe(topic='$aws/things/{}/shadow/update'.format(device_id), qos=1)
status_response = ''
while True:
status_response = mqtt.receive()
if status_response.get('status'):
break
shadow = json.loads(status_response['payload'].decode('utf-8'))['state']['reported']
return shadow['state']['reported']['connected']
except Exception as e:
print("Exception occurred: ", e)
return False
def main():
device_id_list = ['device_1', 'device_2', 'device_3']
for device_id in device_id_list:
status = get_device_status(device_id)
print(f"Device {device_id} is online: {status}")
if __name__ == "__main__":
main()
这个示例代码将会检查 AWS IoT 中给定设备的在线状态,使用 describe_endpoint()
函数返回 AWS IoT 数据端点的 URL,使用 MQTT 协议来进行数据传输,并对设备的在线状态进行了判断和输出。