一、将设备消息添加到MQ中
1、配置规则引擎
分别将设备生命周期事件
和设备数据点消息
都连接上同一个topic
如图,两个规则,一个是设备生命周期事件,也就是设备创建、删除、上线、离线记录;另一个是设备数据点消息
2、本地代码订阅消息
在设备连接的MQ消息topic中添加订阅yuyun-topic-data
,用于本地工程订阅MQ发送的消息
本地代码添加订阅:
运行demo,如下如所示,一直接收数据就算订阅成功了
二、数据消息格式
1、设备数据点消息
参数 | 属性 | 类型 | 说明 | 示例 |
---|---|---|---|---|
sysProperty | messageType | string | 消息类型:固定为deviceDatapoint | |
productId | string | 产品ID | 90273 | |
appProperty | deviceId | string | 设备ID | 102839 |
dataTimestamp | int | 设备数据点生产时间戳,单位毫秒,设备上传时可自定义携带 | 15980987429000 | |
datastream | string | 数据流名称 | weather | |
body | object/string/… | 详细的数据点消息内容 | 见如下示例 |
本地接收到的数据实例:
{
"sysProperty": {
"messageType": "deviceDatapoint",
"productId": "90273",
},
"appProperty":{
"deviceId": "102839",
"dataTimestamp": 15980987429000,
"datastream":"weather"
},
"body":{
"temperature": 30,
"humidity": "47%"
}
}
数据中有一个body
参数,里面的数据类型可以是:JSON、数值、字符串和二进制
"body":{
"temperature": 30,
"humidity": "47%"
}
"body": 10
"body": 11.55
// String类型
"body":"sunny with wind"
- 数据格式为二进制数据时,body中数据为二进制数据的索引号 index,示例如下,用户可以通过该索引号通过API获取该数据
"body":{
"index": "3491506_1475204886914_bin"
}
2、设备生命周期事件消息
参数 | 属性 | 类型 | 说明 | 示例 |
---|---|---|---|---|
sysProperty | messageType | string | 消息类型:固定为deviceLifeCycle | |
productId | string | 产品ID | 90273 | |
appProperty | deviceId | string | 设备ID | 102839 |
dataTimestamp | int | 设备消息生产时间戳,单位毫秒 | 15980987429000 | |
body | object | 创建、删除、上线、离线 created/deleted/online/offline | 示例如下 |
{
"sysProperty": {
"messageType": "deviceLifeCycle",
"productId": "90273",
},
"appProperty":{
"deviceId": "102839",
"dataTimestamp": 15980987429000,
},
"body":{
"event": "online"
}
}
三、新建数据库表
新建两个表,一个存放设备生命周期事件,一个存放设备数据点消息
CREATE TABLE `device_data` (
`id` bigint NOT NULL AUTO_INCREMENT,
`product_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '产品ID',
`device_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '设备ID',
`data_stream` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '数据流名称',
`data_value` double(9, 2) NULL DEFAULT NULL COMMENT '数据',
`date_time` datetime NULL DEFAULT NULL COMMENT '设备数据点产生时间',
PRIMARY KEY (`id`) USING BTREE,
INDEX `product_device_id`(`product_id`, `device_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '设备数据点消息' ROW_FORMAT = DYNAMIC;
CREATE TABLE `device_event` (
`id` bigint NOT NULL AUTO_INCREMENT,
`product_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '产品ID',
`device_id` varchar(20) CHARACTER NULL DEFAULT NULL COMMENT '设备ID',
`date_time` datetime NULL DEFAULT NULL COMMENT '时间',
`event` varchar(10) CHARACTER NULL DEFAULT NULL COMMENT '生命周期事件',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '设备生命周期事件表' ROW_FORMAT = DYNAMIC;
四、创建SpringBoot+Mybatis-Plus项目
创建项目可以参照:新建一个Spring Boot+MyBatis-Plus项目
最终建成的项目结构如下:
新建一个mq包,将配置好的demo中的东西搬过来,配置方法链接:https://yuyun.blog.csdn.net/article/details/122960344
搬过来之后就是这样:
此时pom.xml文件内容如下:
project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
modelVersion>4.0.0modelVersion>
parent>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-parentartifactId>
version>2.6.1version>
relativePath/>
parent>
groupId>com.yuyungroupId>
artifactId>springboot-onenetartifactId>
version>0.0.1-SNAPSHOTversion>
name>springboot-onenetname>
description>springboot-onenetdescription>
properties>
java.version>1.8java.version>
properties>
dependencies>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starterartifactId>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-webartifactId>
dependency>
dependency>
groupId>org.projectlombokgroupId>
artifactId>lombokartifactId>
dependency>
dependency>
groupId>com.baomidougroupId>
artifactId>mybatis-plus-boot-starterartifactId>
version>3.4.1version>
optional>trueoptional>
dependency>
dependency>
groupId>mysqlgroupId>
artifactId>mysql-connector-javaartifactId>
scope>runtimescope>
optional>trueoptional>
dependency>
dependency>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-starter-testartifactId>
scope>testscope>
dependency>
dependency>
groupId>com.google.protobufgroupId>
artifactId>protobuf-javaartifactId>
version>3.8.0version>
dependency>
dependency>
groupId>org.eclipse.pahogroupId>
artifactId>org.eclipse.paho.client.mqttv3artifactId>
version>1.2.0version>
dependency>
dependency>
groupId>bouncycastlegroupId>
artifactId>bcprov-jdk15artifactId>
version>140version>
dependency>
dependency>
groupId>com.alibabagroupId>
artifactId>fastjsonartifactId>
version>1.2.78version>
dependency>
dependencies>
build>
plugins>
plugin>
groupId>org.springframework.bootgroupId>
artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
五、运行项目
前面运行的demo中,运行的是MQClient.java文件中的main方法,而main方法中的内容为:
public static void main(String[] args) {
MqClient mqClient = new MqClient();
mqClient.connect();
}
demo运行的时候一直在接收onenet消息队列MQ生产的消息,SpringBoot要达到这种一直运行着的效果,将调用方法放入启动类最容易实现,代码如下:
package com.yuyun;
import com.yuyun.mq.MqClient;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author hyh
*/
@MapperScan("com.yuyun.mapper")
@SpringBootApplication
public class SpringbootOneNetApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootOneNetApplication.class, args);
MqClient mqClient = new MqClient();
mqClient.connect();
}
}
启动SpringBoot项目,出现如下图所示的效果就算运行成功了
此时运行linux平台的模拟设备,其自动向onenet平台发送消息,onenet平台又会通过MQ来发送消息,SpringBoot项目接收到消息:
六、数据存入数据库
1、新建一个Service用来处理消息
接收到的消息是json格式的,解析这个格式的数据我用的是alibaba的fastjson。当messageType为deviceDatapoint
是数据点消息,deviceLifeCycle
是设备生命周期事件。
package com.yuyun.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.yuyun.dto.DeviceDataDTO;
import com.yuyun.dto.DeviceEventDTO;
import com.yuyun.service.DeviceDataService;
import com.yuyun.service.DeviceEventService;
import com.yuyun.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
/**
* @author hyh
*/
@Slf4j
@Service
public class MessageProcessServiceImpl implements MessageProcessService {
@Autowired
private DeviceEventService deviceEventService;
@Autowired
private DeviceDataService deviceDataService;
@Override
public void messageProcess(String body) {
JSONObject obj = JSONObject.parseObject(body);
//产品信息
JSONObject product = obj.getJSONObject("sysProperty");
//产品id
String productId = product.getString("productId");
// 消息类型
String messageType = product.getString("messageType");
//数据体
JSONObject appProperty = obj.getJSONObject("appProperty");
//数据上传时间
Date time = new Date(appProperty.getLong("dataTimestamp"));
//设备id
String deviceId = appProperty.getString("deviceId");
// 生命周期事件
if ("deviceLifeCycle".equals(messageType)){
JSONObject bodyJson = obj.getJSONObject("body");
String event = bodyJson.getString("event");
DeviceEventDTO deviceEvent = new DeviceEventDTO();
deviceEvent.setProductId(productId);
deviceEvent.setDeviceId(deviceId);
deviceEvent.setDateTime(time);
deviceEvent.setEvent(event);
// 存储到数据库
deviceEventService.save(deviceEvent);
}
// 数据点消息
else if ("deviceDatapoint".equals(messageType)){
// 数据流名称
String dataStream = appProperty.getString("datastream");
Double dataValue = obj.getDouble("body");
DeviceDataDTO deviceDataDTO = new DeviceDataDTO();
deviceDataDTO.setProductId(productId);
deviceDataDTO.setDeviceId(deviceId);
deviceDataDTO.setDataStream(dataStream);
deviceDataDTO.setDataValue(dataValue);
deviceDataDTO.setDateTime(time);
// 存储到数据库
deviceDataService.save(deviceDataDTO);
} else {
log.error("未知消息类型的数据:" + body);
}
}
}
2、调用该service方法
demo中提供了一个消息消费方法,PushCallback
类中的messageArrived
方法:
就在这里调用处理消息的service就行
(1)在PushCallback
和MqClient
类上面加上注解@Component
(2)PushCallback
类中引入MessageProcessService
@Autowired
protected MessageProcessService messageProcessService;
private static PushCallback pushCallback;
(3)添加代码
/**
* 通过PostConstruct实现初始化bean之前进行的操作
*/
@PostConstruct
public void init() {
pushCallback = this;
pushCallback.messageProcessService = this.messageProcessService;
}
(4)调用service类的数据处理方法
// 消息处理,存储到数据库
pushCallback.messageProcessService.messageProcess(body);
最终代码如下:
package com.yuyun.mq;
import com.yuyun.service.MessageProcessService;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.logging.Logger;
@Component
public class PushCallback implements MqttCallback {
@Autowired
protected MessageProcessService messageProcessService;
private static PushCallback pushCallback;
private IMqttAsyncClient Client;
private static final Logger logger = Logger.getLogger(PushCallback.class.getCanonicalName());
private MqClient mqClient;
private int reConnTimes = 0;
/**
* 通过PostConstruct实现初始化bean之前进行的操作
*/
@PostConstruct
public void init() {
pushCallback = this;
pushCallback.messageProcessService = this.messageProcessService;
}
public PushCallback(MqClient client) {
mqClient = client;
}
@Override
public void connectionLost(Throwable cause) {
logger.info("connect is losted,and try to reconnect,cause = " + cause.getMessage() );
cause.printStackTrace();
while(true){
if(mqClient.reConnect()){
break;
}
try {
//前20次每秒重连一次
if(reConnTimes++ > 20){
Thread.sleep(1000);
}else{//超过20次后没10s重连一次
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
/**
* 说明:paho 本身有个bug,即此函数接口抛出异常都会回调到connectionLost()的接口,故需要在此函数中用try catch包起来处理,
* 确保无异常抛出。
* */
public void messageArrived(String topic, MqttMessage message) {
try {
byte[] payload = message.getPayload();
OnenetMq.Msg obj = OnenetMq.Msg.parseFrom(payload);
long at = obj.getTimestamp();
long msgid = obj.getMsgid();
String body = new String(obj.getData().toByteArray());
SimpleDateFormat slf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = slf.format(at);
logger.info("time = " + time +
",msg id: " + msgid +
", body: " + body);
// 消息处理,存储到数据库
pushCallback.messageProcessService.messageProcess(body);
}catch(Exception e){
logger.info("messageArrived phrase exception");
}finally {
if(mqClient.getManualAcks()){
mqClient.messageArrivedComplete(message);
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Client = token.getClient();
}
}
3、运行项目
重新运行项目,MQ会把项目停止时没有接收的数据一股脑地传过来,打开数据库后就可以看到数据
此时因为没有收到设备生命周期事件,device_event表中的数据为空
将linux平台的模拟设备关闭运行几次,就收到了生命周期事件数据了:
4、打包成jar包运行
将springboot项目打包成jar包运行:
demo地址:https://gitee.com/hyh17808770899/spring-boot/tree/master/springboot-onenet