MQTT数据测试也没得问题了,虽然我也不知道为啥消息要存储要数据库,但是老大有需求,责无旁贷嘛,顺便我还想试试Knife4j框架,是基于Swagger搭建的,不会Swagger和MQTT消息发布接受请参看:
Springboot整合Swagger
MQTT客户端的安装并连接两种服务器
MQTT两种服务器的安装使用
MQTT使用Java连接测试-一
MQTT使用Java连接测试-二
1.毫无疑问先导包,导入Knife4j、Druid、MybatisPlus以及Mysql的依赖,需要更换其他版本,可以去中央仓库更换(md,两次非人类验证就很烦),
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| <!-- 引入mybatis-plus的依赖--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.2</version> </dependency> <!-- 引入数据源连接池德鲁伊Druid--> <!-- https://mvnrepository.com/artifact/com.alibaba/druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.20</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/com.github.xiaoymin/swagger-bootstrap-ui --> <dependency> <groupId>com.github.xiaoymin</groupId> <artifactId>swagger-bootstrap-ui</artifactId> <version>1.9.6</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency>
<!-- https://mvnrepository.com/artifact/com.github.xiaoymin/knife4j-spring-boot-starter --> <dependency> <groupId>com.github.xiaoymin</groupId> <artifactId>knife4j-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
|
2.新建实体类,这里主要说一下主键生成策略,我试了一下MybatisPlus提供的@TableId(value = “ID”,type = IdType.AUTO)方式,又试了一下JPA提供的@GeneratedValue(strategy = GenerationType.IDENTITY)方式,说实话,md,一模一样,随便用哪种,
3.新建一个Mapper,用于继承MybatisPlus的baseMapper,简单的CRUD我真觉得没必要写啊,你说复杂的连表多表之类谢谢sql,这种还写sql纯属有点浪费时间了,
1 2 3 4 5 6 7 8 9 10 11 12
| package cn.zl.springbootmqtt.mapper;
import cn.zl.springbootmqtt.entity.AlertInfo; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface AlertInfoMapper extends BaseMapper<AlertInfo> { }
|
4.好像忘了说Knife4j的框架,主要是和Swagger一模一样,就给忘了,就是换一下依赖,登录的路径不一样,其他?md,一样的啊,说一下开启Authorize加token吧,(虽然我这里没用到,但是在我上一个项目用了,感觉很舒服),注意和我上一篇文章对比一下,就知道了,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| package cn.zl.springbootmqtt.config;
import com.fasterxml.classmate.TypeResolver; import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j; import com.github.xiaoymin.swaggerbootstrapui.service.SpringAddtionalModelService; import com.google.common.collect.Lists; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import springfox.bean.validators.configuration.BeanValidatorPluginsConfiguration; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.service.*; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spi.service.contexts.SecurityContext; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.ArrayList; import java.util.List;
@Configuration @EnableSwagger2 @EnableKnife4j @Import(BeanValidatorPluginsConfiguration.class) public class SwaggerConfig {
@Bean(value = "groupRestApi") public Docket groupRestApi(){ return new Docket(DocumentationType.SWAGGER_2) .apiInfo(apiInfo()) .groupName("设备模块") .select() .apis(RequestHandlerSelectors.basePackage("cn.zl.springbootmqtt.controller")) .paths(PathSelectors.any()) .build() .securityContexts(Lists.newArrayList(securityContext(),securityContext())).securitySchemes(Lists.<SecurityScheme>newArrayList(apiKey())); }
private ApiKey apiKey() { return new ApiKey("token", "Authorization", "header"); }
private SecurityContext securityContext() { return SecurityContext.builder() .securityReferences(defaultAuth()) .forPaths(PathSelectors.regex("/.*")) .build(); }
List<SecurityReference> defaultAuth() { AuthorizationScope authorizationScope = new AuthorizationScope("global", "accessEverything"); AuthorizationScope[] authorizationScopes = new AuthorizationScope[1]; authorizationScopes[0] = authorizationScope; return Lists.newArrayList(new SecurityReference("token", authorizationScopes)); }
private ApiInfo apiInfo(){ Contact contact = new Contact("ZL","http://www.zllwsy.com","zllwsy@outlook.com"); return new ApiInfo( "ZL的SwaggerAPI文档", "学习永无止境", "v1.0", "localhost:8088/doc.html", contact, "Apache 2.0", "http://www.apache.org/licenses/LICENSE-2.0", new ArrayList() ); } }
|
5.写一写Controller,我把发布和订阅放在一起了,方便调用接口,参数是我为了测试一下Knife4j的功能展示,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Api(tags = "订阅发布") @RestController @RequestMapping("/topic") public class MqttController { private static final Random RANDOM = new Random(); @ApiOperationSupport(author = "zllwsy") @ApiOperation(value="发布消息", notes="发布消息", produces="application/json") @ApiImplicitParams({ @ApiImplicitParam(name = "word", value = "单词", paramType = "query", required = true, dataType = "String"), @ApiImplicitParam(name = "name", value = "名字", paramType = "query", required = true, dataType = "String") }) @GetMapping("/push") public void pushTopic(String word,String name) { MqttPushClient instance = MqttPushClient.getInstance(); MqttMessage message = new MqttMessage(); AlertInfo alertInfo = new AlertInfo(); alertInfo.setWdaDeviceCode("007"); alertInfo.setWdaDeviceType("21"); alertInfo.setWdaAlarmType("1"); alertInfo.setWdaAlarmTime(new Date()); alertInfo.setWdaAlarmContent("到底啥数据呀"); alertInfo.setWdaRemark("我傻了"); String s = JSONObject.toJSONString(alertInfo); message.setPayload(s.getBytes()); instance.publish("topicPub", message); }
@ApiOperationSupport(author = "zllwsy") @ApiOperation(value="订阅消息", notes="订阅消息", produces="application/json") @GetMapping("/sub") public void subTopic(){ MqttSubClient instance = MqttSubClient.getInstance(); instance.subscribe("topicPub"); } }
|
6.在订阅客户端的回调函数里面对接收到的消息进行处理,也是在上次的基础上添加的,不懂得一定要看看另外几篇,我觉得挺详细的,因为MQTT接收的数据是byte[]格式,我就把它放进队列里面,然后再取出来转为了String,再将String转换为实体类,然后就插入数据库了,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
@Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + mqttMessage.getQos()); System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload())); Queue<byte[]> q = new LinkedList<byte[]>(); q.add(mqttMessage.getPayload()); byte[] c = q.peek(); String res = new String(c); AlertInfo alertInfo = JSONObject.parseObject(res , AlertInfo.class); System.out.println(alertInfo); if (alertInfo != null){ try { alertInfoMapper = BeanUtil.getBean(AlertInfoMapper.class); alertInfoMapper.insert(alertInfo); }catch (Exception e){ e.printStackTrace(); logger.debug("error"); } }else { System.out.println("device的值为null"); } }
|
7.如果是认真在看再学的已经发现了一下问题,这里我使用了手动获取bean,
8.为啥手动获取?(手动黑脸),没办法啊,不知道为啥注入一直失败报alertInfoMapper为null,Mapper和Service都失败,我找了好久的问题,最后可能是这个回调方法运行的时候,Mapper都还没有开始加载,反正就一直alertInfoMapper报空指针,最后实在没办法,我就只有手动写个工具获取bean,(可别说我没写@MapperScan注解了,谁试谁知道,)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package cn.zl.springbootmqtt.util;
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;
@Component public class BeanUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContextParam) throws BeansException { applicationContext=applicationContextParam; } public static Object getObject(String id) { Object object = null; object = applicationContext.getBean(id); return object; } public static <T> T getObject(Class<T> tClass) { return applicationContext.getBean(tClass); }
public static Object getBean(String tClass) { return applicationContext.getBean(tClass); }
public static <T> T getBean(Class<T> tClass) { return applicationContext.getBean(tClass); } }
|
9.害,勉强过得去,启动项目,记得加上@EnableSwagger2注解
1 2 3 4 5 6 7 8 9
| @SpringBootApplication @EnableSwagger2 @MapperScan("cn.zl.springbootmqtt.mapper") public class SpringbootMqttApplication {
public static void main(String[] args) { SpringApplication.run(SpringbootMqttApplication.class, args); } }
|
10.在浏览器输入路径,http://localhost:8088/doc.html,当然你输入你自己的ip和端口,别说,还挺好看,
11.测试,先订阅,再发布,很抱歉,这里是回调函数,没得返回的参数,就很尴尬,所以还是看控制台的输出和数据库数据的生成把,
12.差点忘了,还有个yml配置,还是需要给一份,万一以后我自己忘了呢,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #mq配置 com: mqtt: host: tcp://x.x.x.x:1883 clientId: SubServer #一定要唯一 topic: good,test,yes username: admin password: public timeout: 10 keepalive: 20
#配置数据库 spring: datasource: username: xxxx password: xxxx url: jdbc:mysql://x.x.x.x:3306/lslmp?characterEncoding=UTF-8&serverTimezone=UTC type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.cj.jdbc.Driver
#日志 logging: level: root: info cn: zl: springbootmqtt: debug
mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl map-underscore-to-camel-case: true type-aliases-package: cn.zl.springbootmqtt.entity
server: port: 8088
|
13.有一个时区的问题,将数据库连接的时候serverTimezone=UTC改为serverTimezone=GMT%2B8就可以了,
14.给一下接口框架的常用注解解析,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Api:修饰整个类,描述Controller的作用
@ApiOperation:描述一个类的一个方法,或者说一个接口
@ApiParam:单个参数描述
@ApiModel:用对象来接收参数
@ApiProperty:用对象接收参数时,描述对象的一个字段
@ApiResponse:HTTP响应其中1个描述
@ApiResponses:HTTP响应整体描述
@ApiIgnore:使用该注解忽略这个API
@ApiError :发生错误返回的信息
@ApiImplicitParam:描述一个请求参数,可以配置参数的中文含义,还可以给参数设置默认值
@ApiImplicitParams:描述由多个 @ApiImplicitParam 注解的参数组成的请求参数列表
|
总结,遇到了一些问题,尤其是Mapper那里为null的时候,真的找了好久,其实现在也不是很确定原因,总归是实现了,你学到了嘛,