阿里云
1024开发者盛宴之Java专家问答专场
发表主题 回复主题
  • 917阅读
  • 0回复

[技术交流]消息服务实时消费设备状态变化和数据

级别: 管理员
发帖
135
云币
241


0.准备工作



0.1 注册阿里账号


使用个人淘宝账号或手机号,开通阿里云账号,并通过实名认证(可以用支付宝认证)

0.2 免费开通IoT物联网套件


IoT套件产品官网 https://www.aliyun.com/product/iot
消息服务官网 https://mns.console.aliyun.com/#/list/cn-shanghai

0.3 软件环境


Nodejs安装 https://nodejs.org/en/download/
Java8 安装
开发工具:Sublime Text3/ IntelliJ IDEA
https://www.sublimetext.com/3

1.阿里云IoT控制台配置服务端订阅


阿里云IoT物联网套件开通 https://www.aliyun.com/product/iot

1.1 创建产品(基础版)


我们在阿里云IoT控制台,创建产品:空气检测,选择基础版。



1.2 在产品下添加设备


我们在阿里云IoT控制台,设备管理里空气检测产品下添加一个具体设备。



1.2 在产品下配置服务端订阅


我们在阿里云IoT控制台,为空气检测产品开通服务端订阅,勾选设备上报消息和设备状态变化通知,开通后会有MNS的区域:cn-shanghai和队列信息:aliyun-iot-a1jnUEKYhw4,如下。


通过阅读阿里云IoT文档,我们了解到队列中消息结构体如下:
  1. {
  2.   "payload": "Base64 Encode的数据",
  3.   "messagetype": "status",
  4.   "messageid": 996000000000000001,
  5.   "topic": "具体的设备Topic",
  6.   "timestamp": 1526450324
  7. }


  • messageid:IoT套件生成的消息ID

  • messagetype:指的是消息类型:设备状态status设备上报消息upload

  • topic:表示该消息源套件中的哪个topic,当messageType=status时,topic为null,当messageType=upload时,topic为具体的设备Topic

  • payload:数据为Base64 Encode的数据。当messageType=status时,数据是设备状态数据;当messageType=upload时,data即为设备发布到Topic中的原始数据。

  • timestamp:队列中消息生成时间戳,并非业务的时间戳




2.设备端开发


我们采用nodejs脚本模拟设备,与IoT云端建立连接,上报数据。

2.1 获取nodejs版IoT SDK


package.json中添加npm依赖"aliyun-iot-mqtt": "0.0.4"模块
  1. {
  2.   "name": "aliyun-iot",
  3.   "dependencies": {
  4.     "aliyun-iot-mqtt": "^0.0.4"
  5.   },
  6.   "author": "wongxming",
  7.   "license": "MIT"
  8. }
下载安装SDK
  1. $npm install



2.2 编写设备端应用程序代码


我们需要在控制台获取设备身份三元组:productKey,deviceName,deviceSecret,以及产品分区regionId
  1. /**
  2. * package.json 添加依赖:"aliyun-iot-mqtt": "0.0.4"
  3. node iot-mns.js
  4. */
  5. const mqtt = require('aliyun-iot-mqtt');
  6. //设备三元组
  7. const options = {
  8.     productKey: "RY8ExdyS6lU",
  9.     deviceName: "officeThermometer",
  10.     deviceSecret: "oauYYavdIV9QOt7d9WcrnIjXSNc2i26A",
  11.     regionId: "cn-shanghai"
  12. };
  13. //设备与云 建立连接,设备上线
  14. const client = mqtt.getAliyunIotMqttClient(options);
  15. //主题topic
  16. const topic = `${options.productKey}/${options.deviceName}/update`;
  17. var data = {
  18.     temperature: Math.floor((Math.random()*20)+10),
  19.     humidity: Math.floor((Math.random()*80)+20),
  20. };
  21. //指定topic发布数据到云端
  22. client.publish(topic, JSON.stringify(data));
  23. console.log("===postData topic=" + topic)
  24. console.log(data)
  25. //设备下线
  26. //client.end()



2.3 启动模拟设备脚本


  1. $node iot-mns.js

脚本执行后,我们在IoT云端控制台产品-日志服务里查看设备行为分析日志,根据时间顺序,我们看到
  1. 首先在10:53:05时,设备建立连接,上线;
  2. 然后在10:53:12时,设备断开连接,下线。
查看设备上行消息分析日志,根据时间顺序,我们看到
  1. 首先设备publish message,
  2. 然后流转到RuleEngine规则引擎,
  3. 最终Transmit MNS的消息队列aliyun-iot-a1jnUEKYhw4
我们切换到MNS控制台,选择华东2区域,可以看到消息队列aliyun-iot-a1jnUEKYhw4有3条活跃消息MNS控制台:https://mns.console.aliyun.com/#/list/cn-shanghai

3 消费队列中设备消息



3.1 消息服务MNS使用

我们以java开发为例,pom中添加依赖aliyun-sdk-mns,httpasyncclient,fastjson,如下:
  1. <dependencies>
  2.         <dependency>
  3.             <groupId>com.aliyun.mns</groupId>
  4.             <artifactId>aliyun-sdk-mns</artifactId>
  5.             <version>1.1.8</version>
  6.             <classifier>jar-with-dependencies</classifier>
  7.         </dependency>
  8.         <dependency>
  9.             <groupId>org.apache.httpcomponents</groupId>
  10.             <artifactId>httpasyncclient</artifactId>
  11.             <version>4.0.1</version>
  12.         </dependency>
  13.         <dependency>
  14.             <groupId>com.alibaba</groupId>
  15.             <artifactId>fastjson</artifactId>
  16.             <version>1.2.42</version>
  17.         </dependency>
  18.     </dependencies>

我们通过mns的sdk,创建连接,轮询获取队列消息。为了方便阅读,我们对消息的payload数据做base64解码,完整应用程序代码如下:
  1. import com.alibaba.fastjson.JSON;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.aliyun.mns.client.CloudAccount;
  4. import com.aliyun.mns.client.CloudQueue;
  5. import com.aliyun.mns.client.MNSClient;
  6. import com.aliyun.mns.model.Message;
  7. import org.apache.commons.codec.binary.Base64;
  8. public class ComsumerDemo {
  9.     public static String accessKeyId = "AK";
  10.     public static String accessKeySecret = "AK秘钥";
  11.     public static String endpoint = "mns公网Endpoint";
  12.     public static void main(String[] args) {
  13.         CloudAccount account = new CloudAccount(accessKeyId,accessKeySecret,endpoint);
  14.         MNSClient client = account.getMNSClient();
  15.         //获取消息队列
  16.         CloudQueue queue = client.getQueueRef("aliyun-iot-a1jnUEKYhw4");
  17.         while (true) {
  18.             Message popMsg = queue.popMessage(10);
  19.             if (popMsg != null) {
  20.                 System.out.println("message id: " + popMsg.getMessageId());
  21.                 System.out.println("message body: \n" + decodeBase64(popMsg.getMessageBodyAsString()));
  22.                 //删除消息
  23.                 queue.deleteMessage(popMsg.getReceiptHandle());
  24.             }
  25.         }
  26.     }
  27.     public static String decodeBase64(String jsonString) {
  28.         try {
  29.             JSONObject json = JSON.parseObject(jsonString);
  30.             String payload = new String(Base64.decodeBase64(json.getString("payload")));
  31.             json.put("payload", JSON.parseObject(payload));
  32.             return json.toJSONString();
  33.         } catch (Exception e) {
  34.             e.printStackTrace();
  35.         }
  36.         return null;
  37.     }
  38. }



3.2 运行程序


控制台输出如下,我们看到1) timestamp=1526450324时,设备上线消息,
  1. message id: E2CF179AD5686386-2-16367878417-200000009
  2. message body:
  3. {
  4.   "payload": {
  5.     "lastTime": "2018-05-16 13:58:44.413",
  6.     "clientIp": "42.120.74.246",
  7.     "time": "2018-05-16 13:58:44.427",
  8.     "productKey": "a1jnUEKYhw4",
  9.     "deviceName": "suw8umOqgJ72yCADerZp",
  10.     "status": "online"
  11.   },
  12.   "messagetype": "status",
  13.   "messageid": 996631012001751041,
  14.   "timestamp": 1526450324
  15. }
2) timestamp=1526450334时,设备上报了数据,通过payload可以看到完整数据
  1. message id: "656A4F66B391367-1-1636787AAC0-20000000C"
  2. message body:
  3. {
  4.   "payload": {
  5.     "temperature": 14,
  6.     "humidity": 116
  7.   },
  8.   "messagetype": "upload",
  9.   "topic": "/a1jnUEKYhw4/suw8umOqgJ72yCADerZp/update",
  10.   "messageid": 996631053735047169,
  11.   "timestamp": 1526450334
  12. }

3) timestamp=1526450353时,设备下线消息


  1. message id: E2CF179AD5686386-2-1636787F5F1-20000000A
  2. message body:
  3. {
  4.   "payload": {
  5.     "lastTime": "2018-05-16 13:59:04.381",
  6.     "time": "2018-05-16 13:59:13.571",
  7.     "productKey": "a1jnUEKYhw4",
  8.     "deviceName": "suw8umOqgJ72yCADerZp",
  9.     "status": "offline"
  10.   },
  11.   "messagetype": "status",
  12.   "messageid": 996631134240534528,
  13.   "timestamp": 1526450353
  14. }





发表主题 回复主题
« 返回列表上一主题下一主题

限100 字节
批量上传需要先选择文件,再选择上传
 
验证问题: 12 + 4 = ?
上一个 下一个