0%

MQTT使用Java连接测试-一

到目前为止,服务装好了,客户端连接上了,如果还没搞定(不会吧不会还有人没搞定吧?),请参看:

MQTT两种服务器的安装使用

MQTT客户端的安装并连接两种服务器

一切都完事了后,那么就开始重头戏,Springboot集成MQTT的测试了。

1.新建一个Springboot项目(这不可能有人不会吧),查了多方资料,然后去maven仓库查看了一下,这三个依赖是需要滴,导入

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- MQTT-jar -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

2.首先写一下配置文件,自动生成的是properties文件,我不习惯,改成yml文件了,请注意,clientId一定要唯一,要唯一,要唯一,希望你能看出我吃了什么亏,

1
2
3
4
5
6
7
8
9
10
#mq配置
com:
mqtt:
host: tcp://127.0.0.1:1883
clientId: exqmId #一定要唯一
topic: good,test,yes
username: admin
password: public
timeout: 10
keepalive: 20

3.为了方便测试,我在Idea里面写了两个客户端,一个用于发布,一个用于订阅,先说明,其实是可以写一个就行,客户端是既能发布又能订阅的,先写一个类来获取yml里面的配置信息(我也试了在实体类里面使用@Component和@Value来获取,是可行的),新建了一个ymlUtil类,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package cn.zl.springbootmqtt.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
* @Description:配置类-从yml配置中获取数据
* @Author: zhanglang
* @Date: 2020/10/27 14:24
*/
public class ymlUtil {
public static String MQTT_HOST;
public static String MQTT_CLIENTID;
public static String MQTT_USER_NAME;
public static String MQTT_PASSWORD;
public static Integer MQTT_TIMEOUT;
public static Integer MQTT_KEEP_ALIVE;

static {
MQTT_HOST = loadMqttYml().getProperty("host");
MQTT_CLIENTID = loadMqttYml().getProperty("clientId");
MQTT_USER_NAME = loadMqttYml().getProperty("username");
MQTT_PASSWORD = loadMqttYml().getProperty("password");
MQTT_TIMEOUT = Integer.valueOf(loadMqttYml().getProperty("timeout"));
MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttYml().getProperty("keepalive"));
}

/**
* 从yml从获取数据
* @return
*/
private static Properties loadMqttYml() {
InputStream inputstream = ymlUtil.class.getResourceAsStream("/application.yml");
Properties properties = new Properties();
try {
properties.load(inputstream);
return properties;
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (inputstream != null) {
inputstream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

}

4.然后新建一个MqttPushClient类,来发布消息,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package cn.zl.springbootmqtt.pub;

import cn.zl.springbootmqtt.util.ymlUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
* @Description:MQTT消息推送或订阅客户端
* @Author: zhanglang
* @Date: 2020/10/27 14:19
*/
@Slf4j
public class MqttPushClient {
private MqttClient client;

private static volatile MqttPushClient mqttPushClient = null;

public static MqttPushClient getInstance() {
if (null == mqttPushClient) {
synchronized (MqttPushClient.class) {
if (null == mqttPushClient) {
mqttPushClient = new MqttPushClient();
}
}
}
return mqttPushClient;
}

private MqttPushClient() {
connect();
}

public void connect() {
try {
client = new MqttClient(ymlUtil.MQTT_HOST, "pub", new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
//连接选项
options.setCleanSession(false);
options.setUserName(ymlUtil.MQTT_USER_NAME);
options.setPassword(ymlUtil.MQTT_PASSWORD.toCharArray());
options.setConnectionTimeout(ymlUtil.MQTT_TIMEOUT);
options.setKeepAliveInterval(ymlUtil.MQTT_KEEP_ALIVE);
//保留会话
options.setCleanSession(true);
//设置回调
client.setCallback(new PushCallBack());
//建立连接
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 发布,默认qos为0,非持久化
*
* @param topic
* @param pushMessage
*/
public void publish(String topic, MqttMessage pushMessage) {
publish(0, false, topic, pushMessage);
}

private void publish(int qos, boolean retained, String topic, MqttMessage pushMessage) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(pushMessage.toString().getBytes());

MqttTopic mqttTopic = client.getTopic(topic);
if (null == mqttTopic) {
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}

5.新建一个类,用于实现回调函数,实现一下MQttCallback类,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.zl.springbootmqtt.pub;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
* @Description:MQTT推送回调类
* @Author: zhanglang
* @Date: 2020/10/27 14:40
*/
public class PushCallBack implements MqttCallback {

/**
* 连接丢失,用于重连
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接已断开,尝试重新连接。。。");
}

/**
* 消息已经送达,接收到消息
* @param topic
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
}

/**
* publish后会执行到这里,发送的状态
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete..." + iMqttDeliveryToken.isComplete());
}
}

6.新建一个类用于测试,我习惯于controller,所以新建了一个PushController类,清晰明了,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package cn.zl.springbootmqtt.controller;

import cn.zl.springbootmqtt.entity.Equiment;
import cn.zl.springbootmqtt.pub.MqttPushClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @Description:
* @Author: zhanglang
* @Date: 2020/10/27 14:44
*/
public class PushController {
public static void main(String[] args) {
MqttPushClient instance = MqttPushClient.getInstance();
MqttMessage message = new MqttMessage();
instance.publish("topicPub",message);
}
}

7.发布的客户端搭建好了,再来以此建立一下订阅的客户端,新建MqttSubClient类,这里一定要注意,ClientId不能重复,

1
client = new MqttClient(ymlUtil.MQTT_HOST, "xxx", new MemoryPersistence());//因为ClientId不能重复,所以这里我就没有再调用工具类来获取yml中的ClientId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package cn.zl.springbootmqtt.sub;

import cn.zl.springbootmqtt.pub.MqttPushClient;
import cn.zl.springbootmqtt.pub.PushCallBack;
import cn.zl.springbootmqtt.util.ymlUtil;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
* @Description:
* @Author: zllwsy
* @Date: 2020/10/27 17:21
*/
public class MqttSubClient {
private MqttClient client;

private static volatile MqttSubClient mqttSubClient = null;

public static MqttSubClient getInstance() {
if (null == mqttSubClient) {
synchronized (MqttPushClient.class) {
if (null == mqttSubClient) {
mqttSubClient = new MqttSubClient();
}
}
}
return mqttSubClient;
}

private MqttSubClient() {
connect();
}

private void connect() {
try {
client = new MqttClient(ymlUtil.MQTT_HOST, "sub", new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
//连接选项
options.setCleanSession(false);
options.setUserName(ymlUtil.MQTT_USER_NAME);
options.setPassword(ymlUtil.MQTT_PASSWORD.toCharArray());
options.setConnectionTimeout(ymlUtil.MQTT_TIMEOUT);
options.setKeepAliveInterval(ymlUtil.MQTT_KEEP_ALIVE);
//保留会话
options.setCleanSession(true);
//设置回调
client.setCallback(new PushCallBack());
//建立连接
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 0);
}

/**
* 订阅某个主题
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
client.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}

8.订阅客户端的回调类,新建SubCallBack,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package cn.zl.springbootmqtt.sub;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
* @Description:MQTT订阅回调类
* @Author: zhanglang
* @Date: 2020/10/27 14:55
*/

public class SubCallBack implements MqttCallback {
/**
* 连接丢失,用于重连
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接已断开,尝试重新连接。。。");
}

/**
* 消息已经送达,接收到消息
* @param topic
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload()));
}
/**
* publish后会执行到这里,发送的状态
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("deliveryComplete..." + iMqttDeliveryToken.isComplete());
}
}

9.新建SubController类,用于订阅主题,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package cn.zl.springbootmqtt.controller;

import cn.zl.springbootmqtt.sub.MqttSubClient;
/**
* @Description:
* @Author: zllwsy
* @Date: 2020/10/27 17:31
*/
public class SubController {
public static void main(String[] args) {
MqttSubClient instance = MqttSubClient.getInstance();
instance.subscribe("topicPub");
}
}

总结,由于篇幅太长,看起来很难受,就将测试放在了下一篇,点击MQTT使用Java连接测试(二),另一个说明,有人问我为啥把引入的包都贴出来,一个是截图方便(ctrl+A,ctrl+c,ctrl+v),二是搞错包也挺烦了,有时候出错就是包不对,有问题请指出。

----------本文结束感谢您的阅读----------