到目前为止,服务装好了,客户端连接上了,如果还没搞定(不会吧不会还有人没搞定吧?),请参看:
MQTT两种服务器的安装使用
MQTT客户端的安装并连接两种服务器
一切都完事了后,那么就开始重头戏,Springboot集成MQTT的测试了。
1.新建一个Springboot项目(这不可能有人不会吧),查了多方资料,然后去maven仓库查看了一下,这三个依赖是需要滴,导入
1 2 3 4 5 6 7 8 9 10 11 12 13
| <!-- MQTT-jar --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
|
2.首先写一下配置文件,自动生成的是properties文件,我不习惯,改成yml文件了,请注意,clientId一定要唯一,要唯一,要唯一,希望你能看出我吃了什么亏,
1 2 3 4 5 6 7 8 9 10
| #mq配置 com: mqtt: host: tcp://127.0.0.1:1883 clientId: exqmId #一定要唯一 topic: good,test,yes username: admin password: public timeout: 10 keepalive: 20
|
3.为了方便测试,我在Idea里面写了两个客户端,一个用于发布,一个用于订阅,先说明,其实是可以写一个就行,客户端是既能发布又能订阅的,先写一个类来获取yml里面的配置信息(我也试了在实体类里面使用@Component和@Value来获取,是可行的),新建了一个ymlUtil类,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package cn.zl.springbootmqtt.util;
import java.io.IOException; import java.io.InputStream; import java.util.Properties;
public class ymlUtil { public static String MQTT_HOST; public static String MQTT_CLIENTID; public static String MQTT_USER_NAME; public static String MQTT_PASSWORD; public static Integer MQTT_TIMEOUT; public static Integer MQTT_KEEP_ALIVE;
static { MQTT_HOST = loadMqttYml().getProperty("host"); MQTT_CLIENTID = loadMqttYml().getProperty("clientId"); MQTT_USER_NAME = loadMqttYml().getProperty("username"); MQTT_PASSWORD = loadMqttYml().getProperty("password"); MQTT_TIMEOUT = Integer.valueOf(loadMqttYml().getProperty("timeout")); MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttYml().getProperty("keepalive")); }
private static Properties loadMqttYml() { InputStream inputstream = ymlUtil.class.getResourceAsStream("/application.yml"); Properties properties = new Properties(); try { properties.load(inputstream); return properties; } catch (IOException e) { throw new RuntimeException(e); } finally { try { if (inputstream != null) { inputstream.close(); } } catch (IOException e) { throw new RuntimeException(e); } } }
}
|
4.然后新建一个MqttPushClient类,来发布消息,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| package cn.zl.springbootmqtt.pub;
import cn.zl.springbootmqtt.util.ymlUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@Slf4j public class MqttPushClient { private MqttClient client;
private static volatile MqttPushClient mqttPushClient = null;
public static MqttPushClient getInstance() { if (null == mqttPushClient) { synchronized (MqttPushClient.class) { if (null == mqttPushClient) { mqttPushClient = new MqttPushClient(); } } } return mqttPushClient; }
private MqttPushClient() { connect(); }
public void connect() { try { client = new MqttClient(ymlUtil.MQTT_HOST, "pub", new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(ymlUtil.MQTT_USER_NAME); options.setPassword(ymlUtil.MQTT_PASSWORD.toCharArray()); options.setConnectionTimeout(ymlUtil.MQTT_TIMEOUT); options.setKeepAliveInterval(ymlUtil.MQTT_KEEP_ALIVE); options.setCleanSession(true); client.setCallback(new PushCallBack()); client.connect(options); } catch (Exception e) { e.printStackTrace(); } }
public void publish(String topic, MqttMessage pushMessage) { publish(0, false, topic, pushMessage); }
private void publish(int qos, boolean retained, String topic, MqttMessage pushMessage) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); mqttMessage.setPayload(pushMessage.toString().getBytes());
MqttTopic mqttTopic = client.getTopic(topic); if (null == mqttTopic) { log.error("topic not exist"); } MqttDeliveryToken token; try { token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } }
|
5.新建一个类,用于实现回调函数,实现一下MQttCallback类,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package cn.zl.springbootmqtt.pub;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushCallBack implements MqttCallback {
@Override public void connectionLost(Throwable throwable) { System.out.println("连接已断开,尝试重新连接。。。"); }
@Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + mqttMessage.getQos()); System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload())); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("deliveryComplete..." + iMqttDeliveryToken.isComplete()); } }
|
6.新建一个类用于测试,我习惯于controller,所以新建了一个PushController类,清晰明了,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package cn.zl.springbootmqtt.controller;
import cn.zl.springbootmqtt.entity.Equiment; import cn.zl.springbootmqtt.pub.MqttPushClient; import org.eclipse.paho.client.mqttv3.MqttMessage;
public class PushController { public static void main(String[] args) { MqttPushClient instance = MqttPushClient.getInstance(); MqttMessage message = new MqttMessage(); instance.publish("topicPub",message); } }
|
7.发布的客户端搭建好了,再来以此建立一下订阅的客户端,新建MqttSubClient类,这里一定要注意,ClientId不能重复,
1
| client = new MqttClient(ymlUtil.MQTT_HOST, "xxx", new MemoryPersistence());//因为ClientId不能重复,所以这里我就没有再调用工具类来获取yml中的ClientId
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| package cn.zl.springbootmqtt.sub;
import cn.zl.springbootmqtt.pub.MqttPushClient; import cn.zl.springbootmqtt.pub.PushCallBack; import cn.zl.springbootmqtt.util.ymlUtil; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSubClient { private MqttClient client;
private static volatile MqttSubClient mqttSubClient = null;
public static MqttSubClient getInstance() { if (null == mqttSubClient) { synchronized (MqttPushClient.class) { if (null == mqttSubClient) { mqttSubClient = new MqttSubClient(); } } } return mqttSubClient; }
private MqttSubClient() { connect(); }
private void connect() { try { client = new MqttClient(ymlUtil.MQTT_HOST, "sub", new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(ymlUtil.MQTT_USER_NAME); options.setPassword(ymlUtil.MQTT_PASSWORD.toCharArray()); options.setConnectionTimeout(ymlUtil.MQTT_TIMEOUT); options.setKeepAliveInterval(ymlUtil.MQTT_KEEP_ALIVE); options.setCleanSession(true); client.setCallback(new PushCallBack()); client.connect(options); } catch (Exception e) { e.printStackTrace(); } }
public void subscribe(String topic) { subscribe(topic, 0); }
public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } }
|
8.订阅客户端的回调类,新建SubCallBack,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package cn.zl.springbootmqtt.sub;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;
public class SubCallBack implements MqttCallback {
@Override public void connectionLost(Throwable throwable) { System.out.println("连接已断开,尝试重新连接。。。"); }
@Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + mqttMessage.getQos()); System.out.println("接收消息内容 : " + new String(mqttMessage.getPayload())); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.out.println("deliveryComplete..." + iMqttDeliveryToken.isComplete()); } }
|
9.新建SubController类,用于订阅主题,
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| package cn.zl.springbootmqtt.controller;
import cn.zl.springbootmqtt.sub.MqttSubClient;
public class SubController { public static void main(String[] args) { MqttSubClient instance = MqttSubClient.getInstance(); instance.subscribe("topicPub"); } }
|
总结,由于篇幅太长,看起来很难受,就将测试放在了下一篇,点击MQTT使用Java连接测试(二),另一个说明,有人问我为啥把引入的包都贴出来,一个是截图方便(ctrl+A,ctrl+c,ctrl+v),二是搞错包也挺烦了,有时候出错就是包不对,有问题请指出。