Spring+mqtt 搭建物联网平台服务端

服务端采用Springboot、JPA、Mysql为基本框架,同时接入了EMQ、JWT、微信认证、Lombook、的一些组件

一、框架搭建

使用IDEA创建项目,选择spring initializr 初始化SpringBoot项目,然后勾选JPA、Lombok、springweb的插件,一路next这样一个基本的springboot项目就搭建起来了。Spring+mqtt 搭建物联网平台服务端

 

二、EMQ接入

1. 引入JAR包


    org.springframework.boot
	spring-boot-starter-integration


	org.springframework.integration
	spring-integration-stream


	org.springframework.integration
	spring-integration-mqtt

2.application.yml添加配置文件

host-url的地址替换为我们部署的EMQ的地址端口号默认为1883

spring:
  mqtt:
    username: admin
    mqpassword: admin
    host-url: tcp://127.0.0.1:1883
    client-id: server_client_${random.value}
    default-topic: $SYS/brokers/+/clients/#
    completionTimeout: 3000
    keepAlive: 60

3.代码配置

3.1 MqttConfiguration类 

用来处理订阅、和发布消息的工厂类。

package com.eric.etcloud.common.configs;

import com.eric.etcloud.common.beans.MqttProperties;
import com.eric.etcloud.common.mqtt.MqttEvent;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * @author yangrui
 * @date 2020年5月14日
 */
@Configuration
@Slf4j
public class MqttConfiguration {
    @Autowired
    private MqttProperties mqttProperties;
    /**
     * 事件触发
     */
    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Bean
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        mqttConnectOptions.setUserName(mqttProperties.getUsername());
        mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});
        mqttConnectOptions.setKeepAliveInterval(2);
        mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
        return mqttConnectOptions;
    }
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    /**
     * 配置client,监听的topic
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),
                        mqttProperties.getDefaultTopic().split(","));
        adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));
        adapter.setConverter(new DefaultPahoMessageConverter());
        //默认添加TopicName中所有tipic
        adapter.addTopic("+/+/client");
        adapter.addTopic("+/+/web");
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message message) throws MessagingException {
                System.out.println("收到了消息");
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                String qos = message.getHeaders().get("mqtt_receivedQos").toString();
                //触发事件 这里不再做业务处理,包 listener中做处理
                System.out.println(topic);
                eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));
                log.info("topic:"+topic+" Qos:"+qos+" message:"+message.getPayload());
            }
        };
    }

    /**
     * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
     *
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        // 在这里进行mqttOutboundChannel的相关设置
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());
        // 如果设置成true,发送消息时将不会阻塞。
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }


}

3.2 JobListener

触发MqttConfiguration 中的event topic 事件,做消息通道的分别处理

package com.eric.etcloud.common.mqtt;


import com.eric.etcloud.common.utils.CommonData;
import com.eric.etcloud.entity.*;
import com.eric.etcloud.service.*;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 *  触发event topic 事件
 * @author yangrui
 * @date 2020年5月23日
 */
@Slf4j
@Component
public class JobListener {

    @Autowired
    DeviceService deviceService;

    @Autowired
    ProductService productService;

    @Autowired
    MqttGateway gateway;

    @Autowired
    ClienthisService clienthisService;

    @Autowired
    NodehisService nodehisService;

    @Autowired
    TriggerService triggerService;

    @Autowired
    TriggerNodeService triggerNodeService;

    @Autowired
    WarnRecordService warnRecordService;

    @Autowired
    WarnInfoService warnInfoService;

    @Value("${platform.env}")
    String env;

    /**
     * 监听topic
     * @param mqttEvent
     */
    @EventListener(condition = "#mqttEvent.topic.endsWith('client')")
    public void onEmqttCall1(MqttEvent mqttEvent){
        String topic = mqttEvent.getTopic();eturn;
        }
        String[] arr = topic.split("/");
        List modelDevices = deviceService.findBySn(arr[1]);
        if(modelDevices.size()!=1){
            System.out.println("设备不存在");
            return;
        }
        if(modelDevices.get(0).getIsNodeDb() == 1){
            //存储日志
            nodehisService.save(new ModelNodehis("",arr[1],topic,mqttEvent.getMessage(),new Timestamp(System.currentTimeMillis())));
        }
        Gson gson = new Gson();
        Map content = gson.fromJson(mqttEvent.getMessage(),Map.class);
        //触发器告警
        List triggers = triggerService.findByProductId(arr[0]);
       
    }

    /**
     * 监听topic
     * @param mqttEvent
     */
    @EventListener(condition = "#mqttEvent.topic.startsWith('$SYS/brokers/')")
    public void onEmqttCall2(MqttEvent mqttEvent){
        String topic = mqttEvent.getTopic();
     
        Gson gson = new Gson();
        EmqClient.ClientInfo clientInfo = gson.fromJson(mqttEvent.getMessage(), EmqClient.ClientInfo.class);
        List modelDevices = deviceService.findBySn(clientInfo.getClientid());
        if(topic.endsWith("/connected")){
            //自动注册
            if(modelDevices.size() == 0){
                ModelProduct product = productService.findById(clientInfo.getUsername());
                deviceService.save(new ModelDevice(clientInfo.getUsername(),clientInfo.getClientid(),product.getEid(),product.getUserId(),new Timestamp(System.currentTimeMillis())));
                //刷新前端设备树
                Map map = new HashMap();
                map.put("productId",clientInfo.getUsername());
                map.put("sn",clientInfo.getClientid());
                gateway.sendToMqtt(env + "server_to_web/refresh",gson.toJson(map));
            }
            //存储连接日志
            if(modelDevices.size() == 1 && modelDevices.get(0).getIsConnDb() == 1){
                clienthisService.save(new ModelClienthis(clientInfo.getClientid(),clientInfo.getUsername(),new Timestamp(System.currentTimeMillis()),1));
            }
        }else{
            //存储连接日志
            if(modelDevices.size() == 1 && modelDevices.get(0).getIsConnDb() == 1){
                clienthisService.save(new ModelClienthis(clientInfo.getClientid(),clientInfo.getUsername(),new Timestamp(System.currentTimeMillis()),0));
            }
        }

    }

    /**
     * 监听topic
     * @param mqttEvent
     */
    @EventListener(condition = "#mqttEvent.topic.equals('device')")
    public void onEmqttCallT(MqttEvent mqttEvent){

        log.info("接收到消11111111111:"+mqttEvent.getMessage());

    }

}

 

3.3 MqttEvent

topic事件类

package com.eric.etcloud.common.mqtt;

import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;

/**
 * topic事件
 * @author yangrui
 * @date 2020年5月23日
 */
@Getter
public class MqttEvent extends ApplicationEvent {

    /**
     *
     */
    private String topic;
    /**
     * 发送的消息
     */
    private String message;

    public MqttEvent(Object source,String topic,String message) {
        super(source);
        this.topic = topic;
        this.message = message;
    }

}

3.4 MqttGateway

消息发送接口

package com.eric.etcloud.common.mqtt;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    void sendToMqtt(String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

三、微信接入

1.引入JAR包


	com.github.binarywang
	weixin-java-miniapp
	3.8.0

2.配置文件yml

wx:
  miniapp:
    configs:
      - appid: yourappid
        secret: youarsecret
        token: yourtoken
        aesKey: youraeskey
        msgDataFormat: JSON

3.代码集成

3.1 WxMaProperties

读取微信yml中的配置文件

package com.eric.etcloud.common.configs;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.List;

/**
 * @author Binary Wang
 */
@Data
@ConfigurationProperties(prefix = "wx.miniapp")
public class WxMaProperties {

    private List configs;

    @Data
    public static class Config {
        /**
         * 设置微信小程序的appid
         */
        private String appid;

        /**
         * 设置微信小程序的Secret
         */
        private String secret;

        /**
         * 设置微信小程序消息服务器配置的token
         */
        private String token;

        /**
         * 设置微信小程序消息服务器配置的EncodingAESKey
         */
        private String aesKey;

        /**
         * 消息格式,XML或者JSON
         */
        private String msgDataFormat;
    }

}

3.2 WxMaConfiguration

微信配置中心

package com.eric.etcloud.common.configs;

import cn.binarywang.wx.miniapp.api.WxMaService;
import cn.binarywang.wx.miniapp.api.impl.WxMaServiceImpl;
import cn.binarywang.wx.miniapp.bean.WxMaKefuMessage;
import cn.binarywang.wx.miniapp.bean.WxMaTemplateData;
import cn.binarywang.wx.miniapp.bean.WxMaTemplateMessage;
import cn.binarywang.wx.miniapp.config.impl.WxMaDefaultConfigImpl;
import cn.binarywang.wx.miniapp.message.WxMaMessageHandler;
import cn.binarywang.wx.miniapp.message.WxMaMessageRouter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import me.chanjar.weixin.common.bean.result.WxMediaUploadResult;
import me.chanjar.weixin.common.error.WxErrorException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @author Binary Wang
 */
@Configuration
@EnableConfigurationProperties(WxMaProperties.class)
public class WxMaConfiguration {
    private WxMaProperties properties;

    private static Map routers = Maps.newHashMap();
    private static Map maServices = Maps.newHashMap();

    @Autowired
    public WxMaConfiguration(WxMaProperties properties) {
        this.properties = properties;
    }

    public static WxMaService getMaService(String appid) {
        WxMaService wxService = maServices.get(appid);
        if (wxService == null) {
            throw new IllegalArgumentException(String.format("未找到对应appid=[%s]的配置,请核实!", appid));
        }

        return wxService;
    }

    public static WxMaMessageRouter getRouter(String appid) {
        return routers.get(appid);
    }

    @PostConstruct
    public void init() {
        List configs = this.properties.getConfigs();
        if (configs == null) {
            throw new RuntimeException("大哥,拜托先看下项目首页的说明(readme文件),添加下相关配置,注意别配错了!");
        }

        maServices = configs.stream()
            .map(a -> {
                WxMaDefaultConfigImpl config = new WxMaDefaultConfigImpl();
                config.setAppid(a.getAppid());
                config.setSecret(a.getSecret());
                config.setToken(a.getToken());
                config.setAesKey(a.getAesKey());
                config.setMsgDataFormat(a.getMsgDataFormat());

                WxMaService service = new WxMaServiceImpl();
                service.setWxMaConfig(config);
                routers.put(a.getAppid(), this.newRouter(service));
                return service;
            }).collect(Collectors.toMap(s -> s.getWxMaConfig().getAppid(), a -> a));
    }

    private WxMaMessageRouter newRouter(WxMaService service) {
        final WxMaMessageRouter router = new WxMaMessageRouter(service);
        router
            .rule().handler(logHandler).next()
            .rule().async(false).content("模板").handler(templateMsgHandler).end()
            .rule().async(false).content("文本").handler(textHandler).end()
            .rule().async(false).content("图片").handler(picHandler).end()
            .rule().async(false).content("二维码").handler(qrcodeHandler).end();
        return router;
    }

    private final WxMaMessageHandler templateMsgHandler = (wxMessage, context, service, sessionManager) -> {
        service.getMsgService().sendTemplateMsg(WxMaTemplateMessage.builder()
            .templateId("此处更换为自己的模板id")
            .formId("自己替换可用的formid")
            .data(Lists.newArrayList(
                new WxMaTemplateData("keyword1", "339208499", "#173177")))
            .toUser(wxMessage.getFromUser())
            .build());
        return null;
    };

    private final WxMaMessageHandler logHandler = (wxMessage, context, service, sessionManager) -> {
        System.out.println("收到消息:" + wxMessage.toString());
        service.getMsgService().sendKefuMsg(WxMaKefuMessage.newTextBuilder().content("收到信息为:" + wxMessage.toJson())
            .toUser(wxMessage.getFromUser()).build());
        return null;
    };

    private final WxMaMessageHandler textHandler = (wxMessage, context, service, sessionManager) -> {
        service.getMsgService().sendKefuMsg(WxMaKefuMessage.newTextBuilder().content("回复文本消息")
            .toUser(wxMessage.getFromUser()).build());
        return null;
    };

    private final WxMaMessageHandler picHandler = (wxMessage, context, service, sessionManager) -> {
        try {
            WxMediaUploadResult uploadResult = service.getMediaService()
                .uploadMedia("image", "png",
                    ClassLoader.getSystemResourceAsStream("tmp.png"));
            service.getMsgService().sendKefuMsg(
                WxMaKefuMessage
                    .newImageBuilder()
                    .mediaId(uploadResult.getMediaId())
                    .toUser(wxMessage.getFromUser())
                    .build());
        } catch (WxErrorException e) {
            e.printStackTrace();
        }

        return null;
    };

    private final WxMaMessageHandler qrcodeHandler = (wxMessage, context, service, sessionManager) -> {
        try {
            final File file = service.getQrcodeService().createQrcode("123", 430);
            WxMediaUploadResult uploadResult = service.getMediaService().uploadMedia("image", file);
            service.getMsgService().sendKefuMsg(
                WxMaKefuMessage
                    .newImageBuilder()
                    .mediaId(uploadResult.getMediaId())
                    .toUser(wxMessage.getFromUser())
                    .build());
        } catch (WxErrorException e) {
            e.printStackTrace();
        }

        return null;
    };

}

3.3 WxPortalController

接受微信认证服务器数据,此处注意需要在微信小程序平台【开发】【开发设置】【消息推送】中配置一下,关于小程序的详细配置,会在后面的章节单独详细说明,此处仅配置认证接口。

Spring+mqtt 搭建物联网平台服务端

package com.eric.etcloud.controller;

import cn.binarywang.wx.miniapp.api.WxMaService;
import cn.binarywang.wx.miniapp.bean.WxMaMessage;
import cn.binarywang.wx.miniapp.constant.WxMaConstants;
import com.eric.etcloud.common.annotation.JwtIgnore;
import com.eric.etcloud.common.configs.WxMaConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;

import java.util.Objects;

/**
 * @author Binary Wang
 */
@RestController
@RequestMapping("/wx/portal/{appid}")
public class WxPortalController {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @GetMapping(produces = "text/plain;charset=utf-8")
    @JwtIgnore
    public String authGet(@PathVariable String appid,
                          @RequestParam(name = "signature", required = false) String signature,
                          @RequestParam(name = "timestamp", required = false) String timestamp,
                          @RequestParam(name = "nonce", required = false) String nonce,
                          @RequestParam(name = "echostr", required = false) String echostr) {
        this.logger.info("n接收到来自微信服务器的认证消息:signature = [{}], timestamp = [{}], nonce = [{}], echostr = [{}]",
            signature, timestamp, nonce, echostr);
        System.out.println("收到认证服务器");
        if (StringUtils.isAnyBlank(signature, timestamp, nonce, echostr)) {
            throw new IllegalArgumentException("请求参数非法,请核实!");
        }

        final WxMaService wxService = WxMaConfiguration.getMaService(appid);

        if (wxService.checkSignature(timestamp, nonce, signature)) {
            return echostr;
        }

        return "非法请求";
    }

    @PostMapping(produces = "application/xml; charset=UTF-8")
    @JwtIgnore
    public String post(@PathVariable String appid,
                       @RequestBody String requestBody,
                       @RequestParam(name = "msg_signature", required = false) String msgSignature,
                       @RequestParam(name = "encrypt_type", required = false) String encryptType,
                       @RequestParam(name = "signature", required = false) String signature,
                       @RequestParam("timestamp") String timestamp,
                       @RequestParam("nonce") String nonce) {
        this.logger.info("n接收微信请求:[msg_signature=[{}], encrypt_type=[{}], signature=[{}]," +
                " timestamp=[{}], nonce=[{}], requestBody=[n{}n] ",
            msgSignature, encryptType, signature, timestamp, nonce, requestBody);

        final WxMaService wxService = WxMaConfiguration.getMaService(appid);

        final boolean isJson = Objects.equals(wxService.getWxMaConfig().getMsgDataFormat(),
            WxMaConstants.MsgDataFormat.JSON);
        if (StringUtils.isBlank(encryptType)) {
            // 明文传输的消息
            WxMaMessage inMessage;
            if (isJson) {
                inMessage = WxMaMessage.fromJson(requestBody);
            } else {//xml
                inMessage = WxMaMessage.fromXml(requestBody);
            }

            this.route(inMessage, appid);
            return "success";
        }

        if ("aes".equals(encryptType)) {
            // 是aes加密的消息
            WxMaMessage inMessage;
            if (isJson) {
                inMessage = WxMaMessage.fromEncryptedJson(requestBody, wxService.getWxMaConfig());
            } else {//xml
                inMessage = WxMaMessage.fromEncryptedXml(requestBody, wxService.getWxMaConfig(),
                    timestamp, nonce, msgSignature);
            }

            this.route(inMessage, appid);
            return "success";
        }

        throw new RuntimeException("不可识别的加密类型:" + encryptType);
    }

    @JwtIgnore
    private void route(WxMaMessage message, String appid) {
        try {
            WxMaConfiguration.getRouter(appid).route(message);
        } catch (Exception e) {
            this.logger.error(e.getMessage(), e);
        }
    }

}

3.4 WxApiController

自定义接口,用来自定义用户登录的操作

 

package com.eric.etcloud.controller;

import cn.binarywang.wx.miniapp.api.WxMaService;
import cn.binarywang.wx.miniapp.bean.WxMaJscode2SessionResult;
import com.eric.etcloud.common.CommonController;
import com.eric.etcloud.common.annotation.JwtIgnore;
import com.eric.etcloud.common.configs.Audience;
import com.eric.etcloud.common.configs.WxMaConfiguration;
import com.eric.etcloud.common.configs.WxMaProperties;
import com.eric.etcloud.common.response.Result;
import com.eric.etcloud.common.utils.JwtTokenUtil;
import com.eric.etcloud.entity.ModelUser;
import com.eric.etcloud.service.UserService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import me.chanjar.weixin.common.error.WxErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/wxapi")
@Api(value="微信接口",tags={"微信接口"})
public class WxApiController extends CommonController {
	private WxMaProperties properties;
	private static Logger logger = LoggerFactory.getLogger(WxApiController.class);

	@Autowired
	private Audience audience;

	@Autowired
	UserService userService;

	@ApiOperation(value = "微信登录接口" ,notes = "微信登录接口" )
	@ApiImplicitParams({
			@ApiImplicitParam(name = "code" ,value = "微信小程序code" , required = true, dataType = "String")
	})
	@RequestMapping(value = "/login", method = { RequestMethod.GET  })
	@JwtIgnore
	public Result login(HttpServletResponse response, String code){
		System.out.println("登录接口");
		System.out.println(code);
		final WxMaService wxService = WxMaConfiguration.getMaService("wx42ac48883a975c48");
		try {
			WxMaJscode2SessionResult session = wxService.getUserService().getSessionInfo(code);
			Map result = new HashMap();
			String openid = session.getOpenid();
			result.put("openid",openid);
			result.put("sessionid",session.getSessionKey());
			List users = userService.findByOpenid(openid);
			if(users.size()==1){
				ModelUser user = users.get(0);
				//业务代码删除了
			}
			result.put("isBind",false);
			return Result.SUCCESS(result);
		} catch (WxErrorException e) {
			logger.error(e.getMessage(), e);
			return Result.FAIL("服务器异常");
		}
	}

	@ApiOperation(value = "获得TOKEN的状态" ,notes = "获得TOKEN的状态" )
	@ApiImplicitParams({
			@ApiImplicitParam(name = "token" ,value = "微信小程序token" , required = true, dataType = "String")
	})
	@RequestMapping(value = "/checkToken", method = { RequestMethod.GET  })
	@JwtIgnore
	public Result checkToken(String token){
		boolean isExpiration = JwtTokenUtil.isExpiration(token);
		return Result.SUCCESS(isExpiration);
	}

}

 

四、集成JWT

1.引入JAR包


	io.jsonwebtoken
	jjwt
	0.9.0

2.添加配置文件yml

##jwt配置
audience:
  # 代表这个JWT的接收对象,存入audience
  clientId: 098f6bcd4621d373cade4e832627b4f6
  # 密钥, 经过Base64加密, 可自行替换
  base64Secret: MDk4ZjZiY2Q0NjIxZDM3DSDzMjYyN2I0ZjY=
  # JWT的签发主体,存入issuer
  name: superuser
  # 过期时间,时间戳 一天
  expiresSecond: 86400000

3.代码集成

3.1 Audience

读取配置文件信息

package com.eric.etcloud.common.configs;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Data
@ConfigurationProperties(prefix = "audience")
@Component
public class Audience {
    private String clientId;
    private String base64Secret;
    private String name;
    private int expiresSecond;
}

3.2 JwtTokenUtil

生成JWT和校验JWT

package com.eric.etcloud.common.utils;

import com.eric.etcloud.common.configs.Audience;
import com.eric.etcloud.common.exception.CustomException;
import com.eric.etcloud.common.response.ResultCode;
import com.eric.etcloud.entity.ModelUser;
import io.jsonwebtoken.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.spec.SecretKeySpec;
import javax.servlet.http.HttpServletRequest;
import javax.xml.bind.DatatypeConverter;
import java.security.Key;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * ========================
 * Created with IntelliJ IDEA.
 * User:eric
 * Date:2020/6/17 17:24
 * Version: v1.0
 * ========================
 */
public class JwtTokenUtil {

    private static Logger log = LoggerFactory.getLogger(JwtTokenUtil.class);

    public static final String AUTH_HEADER_KEY = "Authorization";

    public static final String TOKEN_PREFIX = "Bearer ";

    public static final String base64Security = "MDk4ZjZiY2Q0NjIxZDDSADANhZGU0ZTgzMjYyN2I0ZjY=";

    /**
     * 解析jwt
     * @param jsonWebToken
     * @param base64Security
     * @return
     */
    public static Claims parseJWT(String jsonWebToken, String base64Security) {
        try {
            Claims claims = Jwts.parser()
                    .setSigningKey(DatatypeConverter.parseBase64Binary(base64Security))
                    .parseClaimsJws(jsonWebToken).getBody();
            return claims;
        } catch (ExpiredJwtException  eje) {
            log.error("===== Token过期 =====", eje);
            throw new CustomException(ResultCode.PERMISSION_TOKEN_EXPIRED);
        } catch (Exception e){
            log.error("===== token解析异常 =====", e);
            throw new CustomException(ResultCode.PERMISSION_TOKEN_INVALID);
        }
    }

    /**
     * 构建jwt
     * @param key
     * @param object
     * @param audience
     * @return
     */
    public static String createJWT(String key, Object object, Audience audience) {
        try {
            // 使用HS256加密算法
            SignatureAlgorithm signatureAlgorithm = SignatureAlgorithm.HS256;

            long nowMillis = System.currentTimeMillis();
            Date now = new Date(nowMillis);

            //生成签名密钥
            byte[] apiKeySecretBytes = DatatypeConverter.parseBase64Binary(audience.getBase64Secret());
            Key signingKey = new SecretKeySpec(apiKeySecretBytes, signatureAlgorithm.getJcaName());

            //添加构成JWT的参数
            JwtBuilder builder = Jwts.builder().setHeaderParam("typ", "JWT")
                    // 可以将基本不重要的对象信息放到claims
                    .claim(key, object)
//                    .setSubject(username)           // 代表这个JWT的主体,即它的所有人
                    .setIssuer(audience.getClientId())              // 代表这个JWT的签发主体;
                    .setIssuedAt(new Date())        // 是一个时间戳,代表这个JWT的签发时间;
                    .setAudience(audience.getName())          // 代表这个JWT的接收对象;
                    .signWith(signatureAlgorithm, signingKey);
            //添加Token过期时间
            int TTLMillis = audience.getExpiresSecond();
            if (TTLMillis >= 0) {
                long expMillis = nowMillis + TTLMillis;
                Date exp = new Date(expMillis);
                builder.setExpiration(exp)  // 是一个时间戳,代表这个JWT的过期时间;
                        .setNotBefore(now); // 是一个时间戳,代表这个JWT生效的开始时间,意味着在这个时间之前验证JWT是会失败的
            }

            //生成JWT
            return builder.compact();
        } catch (Exception e) {
            log.error("签名失败", e);
            throw new CustomException(ResultCode.PERMISSION_SIGNATURE_ERROR);
        }
    }
    /**
     * 获取当前登录的用户对象
     * HttpUserInfoRes
     * @author eric
     * @date 2020年6月15日上午11:53:35
     */
    public static ModelUser getUserByWebToken(HttpServletRequest request) {
        try {
            final String authHeader = request.getHeader(AUTH_HEADER_KEY);
            log.info("## authHeader= {}", authHeader);

            // 获取token
            final String jsonWebToken = authHeader.substring(7);
            Claims claims = parseJWT(jsonWebToken, base64Security);
            Map map =  claims.get("tUser", HashMap.class);
            ModelUser user = new ModelUser();
            user.setId(map.get("id").toString());
            user.setEid(map.get("eid").toString());
            user.setUsertype(Integer.parseInt(map.get("usertype").toString()));
            return user;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 是否已过期
     * @param token
     * @return
     */
    public static boolean isExpiration(String token) {
        return parseJWT(token, base64Security).getExpiration().before(new Date());
    }
}

3.3LoginConfiguration

配置拦截器

package com.eric.etcloud.common.configs;

import com.eric.etcloud.common.filters.LoginInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.handler.MappedInterceptor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @ProjectName: 登录拦截器
 * @ClassName: LoginConfiguration
 * @Description: 负责注册并生效自己定义的拦截器配置
 * @Author:eric
 * @Date:
 * @Version: 1.0
 */
@Configuration
public class LoginConfiguration implements WebMvcConfigurer {
    @Bean
    public MappedInterceptor getMappedInterceptor() {
        //注册拦截器
        LoginInterceptor loginInterceptor = new LoginInterceptor();
        //拦截路径 ("/**")对所有请求都拦截
        String[] includePatterns = new String[]{"/**"};
        //排除拦截路径
        String[] excludePatterns = new String[]{"/swagger-resources/**", "/webjars/**", "/v2/**", "/swagger-ui.html/**",
                "/api", "/api-docs", "/api-docs/**"};

        //将数组转化为集合
        List listOldExclude = Arrays.asList(excludePatterns);

        //将自定义的排除拦截路径添加到集合中
        List listNewExclude = new ArrayList();
        listNewExclude.add("/netgate-server/dist/*");
        listNewExclude.add("/netgate-server/");

        //定义新集合
        List listExclude = new ArrayList();
        listExclude.addAll(listOldExclude);
        listExclude.addAll(listNewExclude);

        //将新集合转化回新数组
        String[] newExcludePatterns = listExclude.toArray(new String[listExclude.size()]);

        return new MappedInterceptor(includePatterns, newExcludePatterns, loginInterceptor);
    }

    /**
     * 跨域支持
     *
     * @param registry
     */
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("*")
                .allowCredentials(true)
                .allowedMethods("GET", "POST", "DELETE", "PUT", "PATCH", "OPTIONS", "HEAD")
                .maxAge(3600 * 24);
    }
}

3.4LoginInterceptor

实现拦截器

package com.eric.etcloud.common.filters;

import com.eric.etcloud.common.annotation.JwtIgnore;
import com.eric.etcloud.common.configs.Audience;
import com.eric.etcloud.common.exception.CustomException;
import com.eric.etcloud.common.response.ResultCode;
import com.eric.etcloud.common.utils.JwtTokenUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * @ProjectName: demo
 * @Package: com.demo.common.interceptor
 * @ClassName: LoginInterceptor
 * @Description: 登录请求拦截器
 * @Author:
 * @Date:
 * @Version: 1.0
 */
@Slf4j
public class LoginInterceptor implements HandlerInterceptor {
    @Autowired
    private Audience audience;

    /**
     * 在请求被处理之前调用
     * @param request
     * @param response
     * @param handler
     * @return
     * @throws Exception
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        // 忽略带JwtIgnore注解的请求, 不做后续token认证校验
        if (handler instanceof HandlerMethod) {
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            JwtIgnore jwtIgnore = handlerMethod.getMethodAnnotation(JwtIgnore.class);
            if (jwtIgnore != null) {
                return true;
            }
        }

        String requestMethod = request.getMethod();
        if (requestMethod.contains("OPTIONS") || requestMethod.contains("options")) {
            return true;
        }

        // 获取请求头信息authorization信息
        final String authHeader = request.getHeader(JwtTokenUtil.AUTH_HEADER_KEY);
        log.info("## authHeader= {}", authHeader);

        if (StringUtils.isBlank(authHeader) || !authHeader.startsWith(JwtTokenUtil.TOKEN_PREFIX)) {
            log.info("### 用户未登录,请先登录 ###");
            throw new CustomException(ResultCode.USER_NOT_LOGGED_IN);
        }

        // 获取token
        final String token = authHeader.substring(7);

        if(audience == null){
            BeanFactory factory = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext());
            audience = (Audience) factory.getBean("audience");
        }

        // 验证token是否有效--无效已做异常抛出,由全局异常处理后返回对应信息
        JwtTokenUtil.parseJWT(token, audience.getBase64Secret());
        return true;
    }

}

3.5 JwtIgnore

忽略JWT校验注解,比如上边的微信认证的接口我就使用了这个注解忽略jwt校验。

package com.eric.etcloud.common.annotation;

import java.lang.annotation.*;

/**
 * ========================
 * JWT验证忽略注解
 * Created with IntelliJ IDEA.
 * User:eric
 * Version: v1.0
 * ========================
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JwtIgnore {
}

3.5 UserController

登录创建JWT


@RestController
@RequestMapping("/user")
@Api(value="用户接口",tags={"user(用户)-增删改查;导入导出"})
public class UserController extends CommonController {

    @Autowired
	UserService userService;

	@Autowired
	private Audience audience;


    @ApiOperation(value = "用户登录" ,notes = "用户登录")
    @ApiImplicitParams({
		@ApiImplicitParam(name = "email" ,value = "登录邮箱" , required = true, dataType = "String"),
		@ApiImplicitParam(name = "password" ,value = "密码" , required = true, dataType = "String")
	})
	@RequestMapping(value = "/login", method = { RequestMethod.POST })
	@JwtIgnore
    public Result login(
			HttpServletResponse response,
    		@RequestParam(value="email", required=true) String email,
    		@RequestParam(value="password", required=true) String password){
    	ModelUser user = null;

		try {
			user = userService.findByEmail(email).get(0);
            //user.setLasttime(new Timestamp(System.currentTimeMillis()));
            //userService.save(user);记录最近一次登陆时间 暂不记录
		} catch (Exception e) {
			e.printStackTrace();
		}
    	if(user!=null){
			//验证密码
			String md5_password = DigestUtils.md5DigestAsHex(password.getBytes());
			if(md5_password.equals(user.getPassword())){
				HttpUserInfoRes userRes = gson.fromJson(gson.toJson(user),HttpUserInfoRes.class); // 封装返回对象
				String power = userService.getPowerByUserid(user.getId());
	            ModelEid modelEid = eidService.findById(user.getEid());
	            userRes.setEidName(modelEid.getName());
				userRes.setPowers(power);
				String accessToken = JwtTokenUtil.createJWT("tUser", userRes, audience);
				// 将token放在响应头
				response.setHeader(JwtTokenUtil.AUTH_HEADER_KEY, JwtTokenUtil.TOKEN_PREFIX + accessToken);
	            userRes.setLogintime(new Timestamp(System.currentTimeMillis()));
				userRes.setWebtoken(accessToken);
	            logService.save(new ModelServicelog(CommonData.modeluser,NetNotes.info.toInteger(),user.getId(),user.getEid(),user.getUsername()+"登陆了系统"));
				return Result.SUCCESS(userRes);
			}else{
				return Result.FAIL("密码错误");
			}
		}else{
			return Result.FAIL("用户不存在");
		}
    }

}

3.6 普通controller获取jwt中的用户数据

在CommonController中调用JwtTokenUtil解析token获得用户数据

Spring+mqtt 搭建物联网平台服务端

在普通Controller中就可以直接super.getUserid(request)获取用户信息

Spring+mqtt 搭建物联网平台服务端

五、集成定时任务

1.引入JAR包


	org.quartz-scheduler
	quartz
	2.2.2

2.代码集成

2.1 QuartzJobConfig

配置类

package com.eric.etcloud.common.configs;

import com.eric.etcloud.common.beans.ConfigNet;
import com.eric.etcloud.common.job.JobTiming;
import com.eric.etcloud.common.job.QuartzManager;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;

import javax.annotation.Resource;

@Configuration
public class QuartzJobConfig implements ApplicationListener{

	@Autowired
    private QuartzManager quartzManager;
	@Resource
    ConfigNet confignet;

	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		// TODO Auto-generated method stub 0 0 0 * * ?
		//0 0 */1 * * ?
//默认开启的定时任务,此处可以替换为从数据库查询数据表,然后执行定时任务
		try {
			quartzManager.addJob(confignet.getTaskTimingName(),confignet.getTaskTimingGroup(),
					confignet.getTriggerTimingName(),confignet.getTriggerTimingGroup(), JobTiming.class, "0 0 */1 * * ?", null);
//			quartzManager.addJob("heartjobname", "heartgroupname", "hearttriggername", "hearttriggerrgroup",
//					JobHeart.class, "*/55 * * * * ?", null);
			quartzManager.startJob();
            System.out.println("定时任务已经启动...");
        } catch (SchedulerException e) {
            e.printStackTrace();
        }
	}
	 /**
     * 初始注入scheduler
     * @return
     * @throws SchedulerException
     */
    @Bean
    public Scheduler scheduler() throws SchedulerException{
        SchedulerFactory schedulerFactoryBean = new StdSchedulerFactory();
        return schedulerFactoryBean.getScheduler();
    }
}

2.2 JobFactory

定时任务工厂

package com.eric.etcloud.common.job;

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;

//解决SpringBoot不能再Quartz中注入Bean的问题
@Component
public class JobFactory extends AdaptableJobFactory {
 /**
  * AutowireCapableBeanFactory接口是BeanFactory的子类
  * 可以连接和填充那些生命周期不被Spring管理的已存在的bean实例
  */
 private AutowireCapableBeanFactory factory;

 public JobFactory(AutowireCapableBeanFactory factory) {
     this.factory = factory;
 }

 /**
  * 创建Job实例
  */
 @Override
 protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {

     // 实例化对象
     Object job = super.createJobInstance(bundle);
     // 进行注入(Spring管理该Bean)
     factory.autowireBean(job);
     //返回对象
     return job;
 }
}

2.3 JobHeart

一个模拟的定时任务,这是一个心跳定时任务

package com.eric.etcloud.common.job;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;


public class JobHeart  extends QuartzJobBean{

//	@Autowired
//    private NetgateHandler netgateHandler;
	    
	@Override
	protected void executeInternal(JobExecutionContext arg0) throws JobExecutionException {
		// TODO Auto-generated method stub
		System.out.println("beat");
    	try {
//    		netgateHandler.sendHeartBeatToAllManager();
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}

}

2.4 QuartzConfig

配置类

package com.eric.etcloud.common.job;

import org.quartz.Scheduler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

@Configuration
public class QuartzConfig {

    private JobFactory jobFactory;

    public QuartzConfig(JobFactory jobFactory){
        this.jobFactory = jobFactory;
    }

    /**
     * 配置SchedulerFactoryBean
     *
     * 将一个方法产生为Bean并交给Spring容器管理
     */
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean() {
        // Spring提供SchedulerFactoryBean为Scheduler提供配置信息,并被Spring容器管理其生命周期
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        // 设置自定义Job Factory,用于Spring管理Job bean
        factory.setJobFactory(jobFactory);
        return factory;
    }

//    @Bean(name = "scheduler")
//    public Scheduler scheduler() {
//        return schedulerFactoryBean().getScheduler();
//    }
}

2.5 QuartzManager

定时任务Handle可以创建、删除定时任务

package com.eric.etcloud.common.job;

import java.util.Map;

import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.springframework.stereotype.Service;

@Service
public class QuartzManager {

    private Scheduler scheduler;

    public QuartzManager(Scheduler scheduler){
        this.scheduler = scheduler;
    }
    
    /**
     * 开始执行所有任务
     * 默认执行的定时任务
     * @throws SchedulerException
     */
    public void startJob() throws SchedulerException {
//        startJob1(scheduler);
//        startJob2(scheduler);
//    	netHeartBeat(scheduler, "heatbeat", "heartbeat", "*/5 * * * * ?");//每30s发送一次
        scheduler.start();
    }

    /**
     * 添加一个定时任务
     *
     * @param jobName           任务名
     * @param jobGroupName      任务组名
     * @param triggerName       触发器名
     * @param triggerGroupName  触发器组名
     * @param jobClass          任务
     * @param cron              时间设置,参考quartz说明文档
     */
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, String cron, Map params) {
        try {
            // 任务名,任务组,任务执行类
            JobDetail job = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
            // 任务参数
//            job.getJobDataMap().putAll(params);

            // 触发器
            TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();
            // 触发器名,触发器组
            triggerBuilder.withIdentity(triggerName, triggerGroupName);
            triggerBuilder.startNow();
            // 触发器时间设定
            triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
            // 创建Trigger对象
            CronTrigger trigger = (CronTrigger) triggerBuilder.build();

            // 调度容器设置JobDetail和Trigger
            scheduler.scheduleJob(job, trigger);

            // 启动
            if (!scheduler.isShutdown()) {
                scheduler.start();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 修改一个任务的触发时间
     *
     * @param triggerName       触发器名
     * @param triggerGroupName  触发器组名
     * @param cron              时间设置,参考quartz说明文档
     */
    public void modifyJobTime(String triggerName, String triggerGroupName, String cron) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            if (trigger == null) {
                return;
            }

            String oldTime = trigger.getCronExpression();
            if (!oldTime.equalsIgnoreCase(cron)) {
                // 触发器
                TriggerBuilder triggerBuilder = TriggerBuilder.newTrigger();
                // 触发器名,触发器组
                triggerBuilder.withIdentity(triggerName, triggerGroupName);
                triggerBuilder.startNow();
                // 触发器时间设定
                triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron));
                // 创建Trigger对象
                trigger = (CronTrigger) triggerBuilder.build();
                // 方式一 :修改一个任务的触发时间
                scheduler.rescheduleJob(triggerKey, trigger);

            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 移除一个任务
     *
     * @param jobName           任务名
     * @param jobGroupName      任务组名
     * @param triggerName       触发器名
     * @param triggerGroupName  触发器组名
     */
    public void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) {
        try {

            TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName);

            // 停止触发器
            scheduler.pauseTrigger(triggerKey);
            // 移除触发器
            scheduler.unscheduleJob(triggerKey);
            // 删除任务
            scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 获取任务是否存在
     *
     * STATE_BLOCKED 4 阻塞
     * STATE_COMPLETE 2 完成
     * STATE_ERROR 3 错误
     * STATE_NONE -1 不存在
     * STATE_NORMAL 0 正常
     * STATE_PAUSED 1 暂停
     *
     */
    public  Boolean notExists(String triggerName, String triggerGroupName) {
        try {
            return scheduler.getTriggerState(TriggerKey.triggerKey(triggerName, triggerGroupName)) == Trigger.TriggerState.NONE;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    /**
     * 获取Job信息
     * 
     * @param triggerName
     * @param triggerGroupName
     * @return
     * @throws SchedulerException
     */
    public String getJobInfo(String triggerName, String triggerGroupName) throws SchedulerException {
        TriggerKey triggerKey = new TriggerKey(triggerName,triggerGroupName );
        CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
        if(cronTrigger != null) {
	        return String.format("time:%s,state:%s", cronTrigger.getCronExpression(),
	                scheduler.getTriggerState(triggerKey).name());
        }else {
        	return "";
        }
    }
    
    /**
     * 暂停所有任务
     * 
     * @throws SchedulerException
     */
    public void pauseAllJob() throws SchedulerException {
        scheduler.pauseAll();
    }
    
    /**
     * 暂停某个任务
     * 
     * @param name
     * @param group
     * @throws SchedulerException
     */
    public void pauseJob(String triggerName, String triggerGroupName) throws SchedulerException {
        JobKey jobKey = new JobKey(triggerName, triggerGroupName);
        JobDetail jobDetail = scheduler.getJobDetail(jobKey);
        if (jobDetail == null)
            return;
        scheduler.pauseJob(jobKey);
    }
    
    /**
     * 恢复所有任务
     * 
     * @throws SchedulerException
     */
    public void resumeAllJob() throws SchedulerException {
        scheduler.resumeAll();
    }

    /**
     * 恢复某个任务
     * 
     * @param name
     * @param group
     * @throws SchedulerException
     */
    public void resumeJob(String triggerName, String triggerGroupName) throws SchedulerException {
        JobKey jobKey = new JobKey(triggerName, triggerGroupName);
        JobDetail jobDetail = scheduler.getJobDetail(jobKey);
        if (jobDetail == null)
            return;
        scheduler.resumeJob(jobKey);
    }
    
    /**
     * 删除某个任务
     * 
     * @param name
     * @param group
     * @throws SchedulerException
     */
    public void deleteJob(String triggerName, String triggerGroupName) throws SchedulerException {
        JobKey jobKey = new JobKey(triggerName, triggerGroupName);
        JobDetail jobDetail = scheduler.getJobDetail(jobKey);
        if (jobDetail == null)
            return;
        scheduler.deleteJob(jobKey);
    }

}

2.6 JobController

提供可调用的定时任务接口

package com.eric.etcloud.controller;

import com.eric.etcloud.common.CommonController;
import com.eric.etcloud.common.beans.ConfigNet;
import com.eric.etcloud.common.job.JobHeart;
import com.eric.etcloud.common.job.JobVersion;
import com.eric.etcloud.common.job.QuartzManager;
import com.eric.etcloud.common.response.Result;
import com.eric.etcloud.common.utils.CommonData;
import com.eric.etcloud.common.utils.CronDateUtil;
import com.eric.etcloud.common.utils.NetNotes;
import com.eric.etcloud.entity.ModelServicelog;
import com.eric.etcloud.entity.ModelTask;
import com.eric.etcloud.service.ServicelogService;
import com.eric.etcloud.service.TaskService;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.Date;

@RestController
@RequestMapping("/job")
public class JobController extends CommonController {
	private QuartzManager quartzManager;
	
	@Autowired
    ServicelogService logService;
    @Autowired
    TaskService taskService;
    @Resource
    ConfigNet confignet;
	
	public JobController (QuartzManager quartzManager) {
        this.quartzManager = quartzManager;
    }

	
	 /**
     * @Title: getQuartzJob
     * @Description: TODO(定时任务信息)
     * @param @return    参数
     * @return String    返回类型
     * @throws
     */
     @RequestMapping("/heart")
     public void startHeart() {
         quartzManager.addJob("heartjobname", "heartgroupname", "hearttriggername", "hearttriggerrgroup", JobHeart.class, "*/10 * * * * ?", null);
     }
     
     @ApiOperation(value = "定时任务自动升级", notes = "定时任务自动升级")
     @ApiImplicitParams({
 		@ApiImplicitParam(name = "tasktype",value = "tasktype",required = true,dataType = "Integer"),
 		@ApiImplicitParam(name = "taskdata",value = "taskdata",required = false,dataType = "Date"),
 		@ApiImplicitParam(name = "tasktime",value = "tasktime",required = true,dataType = "Date"),
 		@ApiImplicitParam(name = "taskid",value = "taskid",required = true,dataType = "String")
 	})
     @RequestMapping("/upgradeauto")
     public Result startQuartzJob(HttpServletRequest request, int tasktype, Date taskdata, Date tasktime, String taskid) {
     	String cron = "";
         try {
        	//清除定时任务
        	quartzManager.removeJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup(), confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup());
//         	quartzScheduler.deleteJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup());//清除定时任务
         	if(tasktype== NetNotes.everyday.toInteger()) {
         		cron = CronDateUtil.getTaskCron(tasktime);
         		logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.info.toInteger(),super.getUserid(request),super.getEid(request),"开启了一个每天执行的定时升级版本任务"+tasktime));
         		System.out.println("开启了一个每天执行的定时升级版本任务"+tasktime);
         		return Result.SUCCESS("开启了一个每天执行的定时升级版本任务"+tasktime);
         	}else if(tasktype == NetNotes.oneday.toInteger()) {
         		cron = CronDateUtil.getTaskCron(taskdata,tasktime);
         		logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.info.toInteger(),super.getUserid(request),super.getEid(request),"开启了一个指定日期执行的定时升级版本任务"+tasktime));
         		System.out.println("开启了一个指定日期执行的定时升级版本任务"+tasktime);
                return Result.SUCCESS("开启了一个指定日期执行的定时升级版本任务"+tasktime);
         	}
         	//保存数据库
         	taskService.save(new ModelTask(taskid,confignet.getTaskGradeGroup(),confignet.getTaskGradeName(),cron,tasktype,super.getUserid(request),taskdata,tasktime));
            quartzManager.addJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup(), confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup(), JobVersion.class, cron, null);
//         	quartzScheduler.startJob(confignet.getTaskGradeGroup(),confignet.getTaskGradeName(),cron);//开启新的定时任务
         } catch (Exception e) {
             e.printStackTrace();
             return Result.FAIL("指定定时任务失败"+tasktime);
         }
         return Result.SUCCESS();
     }
     /**
      * @Title: getQuartzJob
      * @Description: TODO(定时任务信息)
      * @param @return    参数
      * @return String    返回类型
      * @throws
      */
      @GetMapping("/latesttask")
      public Result getLatestTask() {
     	 return Result.SUCCESS(gson.toJson(taskService.getLatestTask()));
      }
     
     /**
     * @Title: deleteJob
     * @Description: TODO(删除定时任务)
     * @return void    返回类型
     * @throws
     */
     @RequestMapping("/taskdelete")
     public Result deleteJob() {
         try {
        	 quartzManager.removeJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup(), confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup());
             logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.info.toInteger(),"","","删除了升级版本的定时任务"));
             System.out.println("删除了升级版本的定时任务");
         } catch (Exception e) {
             e.printStackTrace();
             logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.error.toInteger(),"","","删除了升级版本的定时任务失败"));
             return Result.FAIL("删除了升级版本的定时任务失败");
         }
         return Result.SUCCESS("删除了升级版本的定时任务");
     }
     
     /**
      * @Title: getQuartzJob
      * @Description: TODO(定时任务信息)
      * @param @return    参数
      * @return String    返回类型
      * @throws
      */
      @RequestMapping("/taskinfo")
      public Result getQuartzJob() {
    	  Boolean result = false;
          try {
        	  //String a = quartzManager.getJobInfo(confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup());
        	 // System.out.println(a);
        	  result = quartzManager.notExists(confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup());
//              info = quartzScheduler.getJobInfo(confignet.getTaskGradeName(), confignet.getTaskGradeGroup());
          } catch (Exception e) {
              e.printStackTrace();
              Result.FAIL();
          }
          return Result.SUCCESS();
      }
}

2.7 CronDateUtil

非常有用的工具类,可以将时间类型转为cron表达式,也可以将cron表达式转化为事件类型

package com.eric.etcloud.common.utils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class CronDateUtil {
    private static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
    private static final String TASK_DATE = " dd MM ? yyyy";
    private static final String TASK_TIME = "ss mm HH";
    
    /**
    * @Title: getCron
    * @Description: TODO(输入Date类型的时间日期转换为cron,用于做指定时间的定时任务)
    * @param @param date
    * @param @param time
    * @param @return    参数
    * @return String    返回类型
    * @throws
    */
    public static String getTaskCron(Date date,Date time) {
    	SimpleDateFormat sdftime = new SimpleDateFormat(TASK_TIME);
    	SimpleDateFormat sdfdate = new SimpleDateFormat(TASK_DATE);
    	String result = "";
    	if(time!=null) {
    		result += sdftime.format(time);
    	}
    	if(date != null) {
    		result += sdfdate.format(date);
    	}
    	return result;
    	
    }
    
    public static String getTaskCron(Date time) {
    	SimpleDateFormat sdftime = new SimpleDateFormat(TASK_TIME);
    	String result = "";
    	if(time!=null) {
    		result += sdftime.format(time);
    	}
    	result += " * * ?";
    	return result;
    	
    }
    
    /***
     *
     * @param date 时间
     * @return  cron类型的日期
     */
    public static String getCron(final Date  date){
        SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
        String formatTimeStr = "";
        if (date != null) {
            formatTimeStr = sdf.format(date);
        }
        return formatTimeStr;
    }
 
    /***
     *
     * @param cron Quartz cron的类型的日期
     * @return  Date日期
     */
 
    public static Date getDate(final String cron) {
 
 
        if(cron == null) {
            return null;
        }
 
        SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
        Date date = null;
        try {
            date = sdf.parse(cron);
        } catch (ParseException e) {
            return null;// 此处缺少异常处理,自己根据需要添加
        }
        return date;
    }
    
    public static void main(String[] args) {
        Date now = new Date();
        System.out.println(now);
        System.out.println(CronDateUtil.getCron(now));
 
        String cron = "20 28 17 02 08 ? 2016";
 
        Date cronDate = CronDateUtil.getDate(cron);
        System.out.println("===================");
        System.out.println(cronDate.toString());
 
 
    }

}

六、集成HTTPClient

1.引入JAR包


	org.apache.httpcomponents
	httpclient

2.RestTemplateConfig

配置文件

package com.eric.etcloud.common.configs;
 
import org.apache.http.client.HttpClient;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContextBuilder;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.RestTemplate;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
 
/**
 * RestTemplate配置
 * 这是一种JavaConfig的容器配置,用于spring容器的bean收集与注册,并通过参数传递的方式实现依赖注入。
 * "@Configuration"注解标注的配置类,都是spring容器配置类,springboot通过"@EnableAutoConfiguration"
 * 注解将所有标注了"@Configuration"注解的配置类,"一股脑儿"全部注入spring容器中。
 * 
 * @author yangrui
 * @date 2020年5月14日
 */
@Configuration
public class RestTemplateConfig {
    
	@Bean
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setRequestFactory(clientHttpRequestFactory());
        restTemplate.setErrorHandler(new DefaultResponseErrorHandler());
        return restTemplate;
    }
 
    @Bean
    public HttpComponentsClientHttpRequestFactory clientHttpRequestFactory() {
        try {
            HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
            @SuppressWarnings("deprecation")
			SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() {
                @Override
                public boolean isTrusted(X509Certificate[] arg0, String arg1) throws CertificateException {
                    return true;
                }
            }).build();
            httpClientBuilder.setSSLContext(sslContext);
            HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE;
            SSLConnectionSocketFactory sslConnectionSocketFactory = new SSLConnectionSocketFactory(sslContext,
                    hostnameVerifier);
            Registry socketFactoryRegistry = RegistryBuilder.create()
                    .register("http", PlainConnectionSocketFactory.getSocketFactory())
                    .register("https", sslConnectionSocketFactory).build();// 注册http和https请求
            // 开始设置连接池
            PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager(
                    socketFactoryRegistry);
            poolingHttpClientConnectionManager.setMaxTotal(2700); // 最大连接数2700
            poolingHttpClientConnectionManager.setDefaultMaxPerRoute(100); // 同路由并发数100
            httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager);
           // httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)); // 重试次数
            HttpClient httpClient = httpClientBuilder.build();
            HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(
                    httpClient); // httpClient连接配置
            clientHttpRequestFactory.setConnectTimeout(20000); // 连接超时
            clientHttpRequestFactory.setReadTimeout(30000); // 数据读取超时时间
            clientHttpRequestFactory.setConnectionRequestTimeout(20000); // 连接不够用的等待时间
            return clientHttpRequestFactory;
        } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) {
        	System.out.println("初始化HTTP连接池出错");
        	e.printStackTrace();
        }
        return null;
    }
}

3. 调用

@Override
	public void postAllData(String rts) {
		// TODO Auto-generated method stub
		System.out.println("进入推送,将要推送的数据大小为"+rts.length());
		List datapush = datapushRepository.findAll();
		for(int i=0;i formEntity = new HttpEntity(rts, headers);

				String resulsj = restTemplate.postForObject(urisj, formEntity, String.class);

				System.out.println("web推送"+urisj+"推送了一条数据成功");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

七、EMQ-HTTP鉴权接口及API调用

1. EmqApiServiceImpl(EMQAPI调用接口)

package com.eric.etcloud.service.impl;

import com.eric.etcloud.service.EmqApiService;
import org.springframework.stereotype.Service;
import sun.misc.BASE64Encoder;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;

@Service
public class EmqApiServiceImpl implements EmqApiService {
	//用户名
	private static String username = "admin";
	//登录密码
	private static String password = "public";
	//服务器地址
	private static String serverPath = "http://127.0.0.1:18083";
	//当前页
	private static int pageIndex = 1;
	//页大小
	private static int pageSize = 100;


	@Override
	public  String query(String queryPathUrl, int pageIndex, int pageSize) throws Exception {
		//拼接查询参数
		if(pageIndex>0&&pageSize>0){
			queryPathUrl = queryPathUrl +"?" + "_page=" + pageIndex + "&" + "_limit=" + pageSize;
		}
		URL url = new URL(serverPath+queryPathUrl);
		HttpURLConnection conn = (HttpURLConnection) url.openConnection();
		String authorization = getBase64(username, password);
		//连接认证信息放在头里,注意,base64可以反编码,有安全隐患
		conn.setRequestProperty("authorization", "Basic "+authorization);
		conn.setRequestMethod("GET");
		// 开始连接
		conn.connect();
		String resule = null ;
		if (conn.getResponseCode() == 200) {
			// 请求返回的数据
			InputStream inputStream = conn.getInputStream();
			byte[] readBuffer = new byte[1024];
			int numBytes = -1;
			ByteArrayOutputStream resultB = new ByteArrayOutputStream();


			while (inputStream.available() > 0) {
				numBytes = inputStream.read(readBuffer);
				if (numBytes >= 0) {
					resultB.write(readBuffer, 0, numBytes);
					readBuffer = new byte[1024];
					Thread.sleep(500);
				}
			}
			resule = new String(resultB.toByteArray(), "UTF-8");
			inputStream.close();
		}
		return resule;
	}

	private static String getBase64(String admin, String aPublic) throws UnsupportedEncodingException {
		final String text = admin+":"+aPublic;
		final BASE64Encoder encoder = new BASE64Encoder();
		final byte[] textByte = text.getBytes("UTF-8");
		return  encoder.encode(textByte);
	}
}

2. EmqApiController(EMQAPI控制器)

package com.eric.etcloud.controller;

import com.eric.etcloud.common.CommonController;
import com.eric.etcloud.common.annotation.JwtIgnore;
import com.eric.etcloud.common.response.Result;
import com.eric.etcloud.entity.EmqClient;
import com.eric.etcloud.service.EmqApiService;
import com.eric.etcloud.service.ProductService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletResponse;
import java.util.List;

@RestController
@RequestMapping("/emqapi")
@Api(value="EMQAPI接口",tags={"EMQAPI接口"})
public class EmqApiController extends CommonController {
	@Autowired
	EmqApiService emqApiService;
	@Autowired
	ProductService productService;

	private static Logger logger = LoggerFactory.getLogger(EmqApiController.class);

	@ApiOperation(value = "获取所有客户端" ,notes = "获取所有客户端" )
	@ApiImplicitParams({
			@ApiImplicitParam(name = "clientid" ,value = "客户端clientId" , required = false, dataType = "String")
	})
	@RequestMapping(value = "/getAllClient", method = { RequestMethod.GET  })
	public Result getAllClient(String clientid){
		//账号密码Base64加密
		String json = "";
		try {
			String url = "/api/v4/clients";
			if(!"".equals(clientid) && clientid!=null){
				url = url + "/" +clientid;
			}
			json = emqApiService.query (url,1, 1000);
		} catch (Exception e) {
			e.printStackTrace();
			return Result.FAIL("获取数据失败");
		}
		//对返回结果的处理
		EmqClient queryResule = gson.fromJson(json, EmqClient.class);
		List data = queryResule.getData ();
		return Result.SUCCESS(data);
	}
	@ApiOperation(value = "客户端连接授权" ,notes = "客户端连接授权" )
	@ApiImplicitParams({
			@ApiImplicitParam(name = "clientid" ,value = "客户端clientId" , required = false, dataType = "String"),
			@ApiImplicitParam(name = "username" ,value = "客户端username" , required = false, dataType = "String"),
			@ApiImplicitParam(name = "password" ,value = "客户端password" , required = false, dataType = "String")
	})
	@RequestMapping(value = "/auth", method = RequestMethod.POST)
	@JwtIgnore
	public void checkUser(String clientid, String username, String password, HttpServletResponse response) {
		logger.info("普通用户;clientid:" + clientid + ";username:" + username + ";password:" + password);
		System.out.println("登录接口");
		//计算用户的剩余设备
		if(productService.countDevNumByPid(username)0) {
			System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"允许通过");
			response.setStatus(200);
		} else {
			System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"产品信息错误,禁止通过");
			response.setStatus(401);
		}
	}

	@RequestMapping("/superuser")
	@JwtIgnore
	public void mqttSuperuser(String clientid, String username, HttpServletResponse response) {
		//auth.http.super_req.params = clientid=%c,username=%u
		if(clientid.startsWith("server_client_")|| clientid.startsWith("web_client_")||clientid.startsWith("wxapp_client_")){
			response.setStatus(200);
			return;
		}
		logger.info("超级用户;clientid:" + clientid + ";username:" + username);
		System.out.println("超级用户;clientid:" + clientid + ";username:" + username);
		response.setStatus(200);
	}

	@RequestMapping("/acl")
	@JwtIgnore
	public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) {
		//auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
		logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
		System.out.println("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
		response.setStatus(200);
	}

}

 

微信部分集成参考https://github.com/binarywang

本文章来源于互联网,如有侵权,请联系删除!原文地址:Spring+mqtt 搭建物联网平台服务端

相关推荐: 物联网竞赛入门

文章目录 物联网竞赛入门 第一天 初识TinyOS TinyOS是什么? TinyOS的编程方式 Modules(模块) Configuration(配置) Interfaces(接口) Headers(头文件) NesC语言编程格式 物联网竞赛入门 by:阿…