0%

MQTT接收消息存储到数据库-使用Knife4j框架

MQTT数据测试也没得问题了,虽然我也不知道为啥消息要存储要数据库,但是老大有需求,责无旁贷嘛,顺便我还想试试Knife4j框架,是基于Swagger搭建的,不会Swagger和MQTT消息发布接受请参看:

Springboot整合Swagger

MQTT客户端的安装并连接两种服务器

MQTT两种服务器的安装使用

MQTT使用Java连接测试-一

MQTT使用Java连接测试-二

1.毫无疑问先导包,导入Knife4j、Druid、MybatisPlus以及Mysql的依赖,需要更换其他版本,可以去中央仓库更换(md,两次非人类验证就很烦),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<!--        引入mybatis-plus的依赖-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.3.2</version>
</dependency>
<!-- 引入数据源连接池德鲁伊Druid-->
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.xiaoymin/swagger-bootstrap-ui -->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.6</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.github.xiaoymin/knife4j-spring-boot-starter -->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>

2.新建实体类,这里主要说一下主键生成策略,我试了一下MybatisPlus提供的@TableId(value = “ID”,type = IdType.AUTO)方式,又试了一下JPA提供的@GeneratedValue(strategy = GenerationType.IDENTITY)方式,说实话,md,一模一样,随便用哪种,

16.1

3.新建一个Mapper,用于继承MybatisPlus的baseMapper,简单的CRUD我真觉得没必要写啊,你说复杂的连表多表之类谢谢sql,这种还写sql纯属有点浪费时间了,

1
2
3
4
5
6
7
8
9
10
11
12
package cn.zl.springbootmqtt.mapper;

import cn.zl.springbootmqtt.entity.AlertInfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* @Description:
* @Author: zllwsy
* @Date: 2020/11/5 16:59
*/
public interface AlertInfoMapper extends BaseMapper<AlertInfo> {
}

4.好像忘了说Knife4j的框架,主要是和Swagger一模一样,就给忘了,就是换一下依赖,登录的路径不一样,其他?md,一样的啊,说一下开启Authorize加token吧,(虽然我这里没用到,但是在我上一个项目用了,感觉很舒服),注意和我上一篇文章对比一下,就知道了,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package cn.zl.springbootmqtt.config;


import com.fasterxml.classmate.TypeResolver;
import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j;
import com.github.xiaoymin.swaggerbootstrapui.service.SpringAddtionalModelService;
import com.google.common.collect.Lists;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import springfox.bean.validators.configuration.BeanValidatorPluginsConfiguration;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.*;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spi.service.contexts.SecurityContext;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

import java.util.ArrayList;
import java.util.List;

/**
* @Description:
* @Author: zllwsy
* @Date: 2020/11/5 11:45
*/
@Configuration
@EnableSwagger2 //开启Swagger2
@EnableKnife4j
@Import(BeanValidatorPluginsConfiguration.class)
public class SwaggerConfig {

@Bean(value = "groupRestApi")
public Docket groupRestApi(){
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.groupName("设备模块")
.select()
.apis(RequestHandlerSelectors.basePackage("cn.zl.springbootmqtt.controller"))
.paths(PathSelectors.any())
.build()
.securityContexts(Lists.newArrayList(securityContext(),securityContext())).securitySchemes(Lists.<SecurityScheme>newArrayList(apiKey()));
}

private ApiKey apiKey() {
return new ApiKey("token", "Authorization", "header");
}

private SecurityContext securityContext() {
return SecurityContext.builder()
.securityReferences(defaultAuth())
.forPaths(PathSelectors.regex("/.*"))
.build();
}

List<SecurityReference> defaultAuth() {
AuthorizationScope authorizationScope = new AuthorizationScope("global", "accessEverything");
AuthorizationScope[] authorizationScopes = new AuthorizationScope[1];
authorizationScopes[0] = authorizationScope;
return Lists.newArrayList(new SecurityReference("token", authorizationScopes));
}

//配置Swagger信息-apiInfo
private ApiInfo apiInfo(){
//作者信息
Contact contact = new Contact("ZL","http://www.zllwsy.com","zllwsy@outlook.com");
return new ApiInfo(
"ZL的SwaggerAPI文档",
"学习永无止境",
"v1.0",
"localhost:8088/doc.html",
contact,
"Apache 2.0",
"http://www.apache.org/licenses/LICENSE-2.0",
new ArrayList()
);
}
}

5.写一写Controller,我把发布和订阅放在一起了,方便调用接口,参数是我为了测试一下Knife4j的功能展示,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Api(tags = "订阅发布")
@RestController
@RequestMapping("/topic")
public class MqttController {
private static final Random RANDOM = new Random();
@ApiOperationSupport(author = "zllwsy")
@ApiOperation(value="发布消息", notes="发布消息", produces="application/json")
@ApiImplicitParams({
@ApiImplicitParam(name = "word", value = "单词", paramType = "query", required = true, dataType = "String"),
@ApiImplicitParam(name = "name", value = "名字", paramType = "query", required = true, dataType = "String")
})
@GetMapping("/push")
public void pushTopic(String word,String name) {
MqttPushClient instance = MqttPushClient.getInstance();
MqttMessage message = new MqttMessage();
AlertInfo alertInfo = new AlertInfo();
alertInfo.setWdaDeviceCode("007");
alertInfo.setWdaDeviceType("21");
alertInfo.setWdaAlarmType("1");
alertInfo.setWdaAlarmTime(new Date());
alertInfo.setWdaAlarmContent("到底啥数据呀");
alertInfo.setWdaRemark("我傻了");
String s = JSONObject.toJSONString(alertInfo);
message.setPayload(s.getBytes());
instance.publish("topicPub", message);
}

@ApiOperationSupport(author = "zllwsy")
@ApiOperation(value="订阅消息", notes="订阅消息", produces="application/json")
@GetMapping("/sub")
public void subTopic(){
MqttSubClient instance = MqttSubClient.getInstance();
instance.subscribe("topicPub");
}
}

6.在订阅客户端的回调函数里面对接收到的消息进行处理,也是在上次的基础上添加的,不懂得一定要看看另外几篇,我觉得挺详细的,因为MQTT接收的数据是byte[]格式,我就把它放进队列里面,然后再取出来转为了String,再将String转换为实体类,然后就插入数据库了,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 消息已经送达,接收到消息
* @param topic
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
Queue<byte[]> q = new LinkedList<byte[]>();
q.add(mqttMessage.getPayload()); //放入队列
byte[] c = q.peek(); //取得队头元素
String res = new String(c);//使用构造函数转换成字符串
AlertInfo alertInfo = JSONObject.parseObject(res , AlertInfo.class);
System.out.println(alertInfo);
if (alertInfo != null){
try {
alertInfoMapper = BeanUtil.getBean(AlertInfoMapper.class);
alertInfoMapper.insert(alertInfo);
}catch (Exception e){
e.printStackTrace();
logger.debug("error");
}
}else {
System.out.println("device的值为null");
}
}

7.如果是认真在看再学的已经发现了一下问题,这里我使用了手动获取bean,

16.2

8.为啥手动获取?(手动黑脸),没办法啊,不知道为啥注入一直失败报alertInfoMapper为null,Mapper和Service都失败,我找了好久的问题,最后可能是这个回调方法运行的时候,Mapper都还没有开始加载,反正就一直alertInfoMapper报空指针,最后实在没办法,我就只有手动写个工具获取bean,(可别说我没写@MapperScan注解了,谁试谁知道,)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package cn.zl.springbootmqtt.util;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
* @Description:
* @Author: zllwsy
* @Date: 2020/11/5 10:25
*/
@Component
public class BeanUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContextParam) throws BeansException {
applicationContext=applicationContextParam;
}
public static Object getObject(String id) {
Object object = null;
object = applicationContext.getBean(id);
return object;
}
public static <T> T getObject(Class<T> tClass) {
return applicationContext.getBean(tClass);
}

public static Object getBean(String tClass) {
return applicationContext.getBean(tClass);
}

public static <T> T getBean(Class<T> tClass) {
return applicationContext.getBean(tClass);
}
}

9.害,勉强过得去,启动项目,记得加上@EnableSwagger2注解

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableSwagger2
@MapperScan("cn.zl.springbootmqtt.mapper")
public class SpringbootMqttApplication {

public static void main(String[] args) {
SpringApplication.run(SpringbootMqttApplication.class, args);
}
}

10.在浏览器输入路径,http://localhost:8088/doc.html,当然你输入你自己的ip和端口,别说,还挺好看,

16.3

11.测试,先订阅,再发布,很抱歉,这里是回调函数,没得返回的参数,就很尴尬,所以还是看控制台的输出和数据库数据的生成把,

16.4

16.5

16.6

12.差点忘了,还有个yml配置,还是需要给一份,万一以后我自己忘了呢,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#mq配置
com:
mqtt:
host: tcp://x.x.x.x:1883
clientId: SubServer #一定要唯一
topic: good,test,yes
username: admin
password: public
timeout: 10
keepalive: 20

#配置数据库
spring:
datasource:
username: xxxx
password: xxxx
url: jdbc:mysql://x.x.x.x:3306/lslmp?characterEncoding=UTF-8&serverTimezone=UTC
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver

#日志
logging:
level:
root: info
cn:
zl:
springbootmqtt: debug

mybatis-plus:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case: true
type-aliases-package: cn.zl.springbootmqtt.entity

server:
port: 8088

13.有一个时区的问题,将数据库连接的时候serverTimezone=UTC改为serverTimezone=GMT%2B8就可以了,

16.7

14.给一下接口框架的常用注解解析,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Api:修饰整个类,描述Controller的作用

@ApiOperation:描述一个类的一个方法,或者说一个接口

@ApiParam:单个参数描述

@ApiModel:用对象来接收参数

@ApiProperty:用对象接收参数时,描述对象的一个字段

@ApiResponse:HTTP响应其中1个描述

@ApiResponses:HTTP响应整体描述

@ApiIgnore:使用该注解忽略这个API

@ApiError :发生错误返回的信息

@ApiImplicitParam:描述一个请求参数,可以配置参数的中文含义,还可以给参数设置默认值

@ApiImplicitParams:描述由多个 @ApiImplicitParam 注解的参数组成的请求参数列表

总结,遇到了一些问题,尤其是Mapper那里为null的时候,真的找了好久,其实现在也不是很确定原因,总归是实现了,你学到了嘛,

----------本文结束感谢您的阅读----------