0%

Springboot集成netty

在尝试了Mqtt、Kafka、RabbitMq几种有关消息的技术后,我越发对消息的通信感兴趣,无意中了解到了netty,没错,第一时间我还是懵逼的,但是我看好多人都说netty是深入学习Java的必经之路,那,我,胖虎,要学,

首先什么是netty?

我大概知道了,但是我不会(其实就是说不清楚,半懂不懂的),推荐一篇文章,我觉得很Ok,连我都看懂了,你不会看不懂吧?传送门:什么是netty–通俗易懂

所以,看完大概知道啥是netty了吧,如果还不能理解,那就,,,再看一遍吧(反正我是读了两三遍),接下来就试着使用Sprongboot来集成一下netty,实现一下简单的客户端与服务端的通信,

1.新建项目,导入依赖,

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>

2.依赖更新一下,我是先开始写客户端的,这里参考了一些文档,但是我真记不得是哪篇了,像我其他笔记,有借鉴我都会贴出来的,抱歉,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.zl.netty.work;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.Scanner;

/**
* @Description:聊天客户端
* @Author: zllwsy
* @Date: 2020/10/29 9:55
*/
public class MyChatClient {
public static void main(String[] args) {
//1.创建一个线程组
EventLoopGroup loopGroup = new NioEventLoopGroup();
//2.创建客户端启动助手,完成相关配置
Bootstrap bootstrap = new Bootstrap()
//3.设置线程组
.group(loopGroup)
//4.设置客户端通道的实现类
.channel(NioSocketChannel.class)
//5.创建一个初始化通道对象 ,//步骤6在MyChatClientInitializer中
.handler(new MyChatClientInitializer());

try {
System.out.println("Client is ready ...");
//7.启动客户端去连接服务器端,connect方法是异步的,sync方法是同步阻塞的
ChannelFuture future = bootstrap.connect("127.0.0.1", 52002).sync();
Channel channel = future.channel();

//8.手动在控制台输入内容,然后enter发送
Scanner scanner = new Scanner(System.in);
System.out.println("please enter ...");
while (true) {
String msg = scanner.nextLine();
if ("exit".equals(msg)) break;
System.out.println("msg is push ...");
channel.writeAndFlush(msg + System.lineSeparator());
}
channel.close();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//9.关闭连接
loopGroup.shutdownGracefully();
System.out.println("Client is end ...");
}

}
}

3.由于自己也是学习,我就在代码中写入了很详细的步骤和注释内容,接着进行客户端的初始化,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package cn.zl.netty.work;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
* @Description:客户端初始化器
* @Author: zllwsy
* @Date: 2020/10/29 9:56
*/
public class MyChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
//编码器
.addLast(new StringDecoder(CharsetUtil.UTF_8))
//解码器
.addLast(new StringEncoder(CharsetUtil.UTF_8))
//6.在pipeline中添加自定义的Handler
.addLast(new MyChatClientHandler());
}
}

4.客户端的处理器,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.zl.netty.work;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


/**
* @Description:客户端的处理器
* @Author: zllwsy
* @Date: 2020/10/29 9:58
*/
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler.channelActive");
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("client: " + ctx);
Channel channel = ctx.channel();
System.out.println("===========|服务端发来的消息: " + msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

5.服务端和客户端其实差不了太多,服务端的类不一样,客户端是提供ip和端口,然后服务端去绑定,也可以称为监听,这样才能获取到客户端发来的消息,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.zl.netty.work;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* @Description:聊天服务器
* @Author: zllwsy
* @Date: 2020/10/29 9:37
*/
public class MychatServer {
public static void main(String[] args) {
//1.创建一个线程组,用来接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.创建一个线程组,处理网络操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.创建服务端启动助手来配置参数
ServerBootstrap serverBootstrap = new ServerBootstrap()
//4.设置两个线程组
.group(bossGroup, workerGroup)
//5.使用NioServerSocketChannel作为服务器端通道的实现
.channel(NioServerSocketChannel.class)
//6.设置线程队列中等待连接的个数
.option(ChannelOption.SO_BACKLOG,128)
//7.保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE,true)
//8.创建一个通道初始化对象 //步骤9在MyChatServerInitializer中
.childHandler(new MyChatServerInitializer());
try {
System.out.println("Server is ready ...");
//10.绑定端口,bind方法是异步的,sync方法是同步阻塞的
ChannelFuture future = serverBootstrap.bind(52002).sync();
System.out.println("Server is starting ...");
//11.关闭通道,关闭线程组
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//关闭连接
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("Server is end ...");
}
}
}

6.然后是服务端的初始化类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package cn.zl.netty.work;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
* @Description:聊天服务端的初始化器
* @Author: zllwsy
* @Date: 2020/10/29 9:39
*/
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()))
//编码器
.addLast(new StringDecoder(CharsetUtil.UTF_8))
//解码器
.addLast(new StringEncoder(CharsetUtil.UTF_8))
//9.在pipeline中添加自定义的Handler
.addLast(new MyChatServerHandler());
}
}

7.最后是服务端的处理器,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package cn.zl.netty.work;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
* @Description:聊天服务器的处理器
* @Author: zllwsy
* @Date: 2020/10/29 9:42
*/
public class MyChatServerHandler extends SimpleChannelInboundHandler<Object> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

public static final String SEPARATOR = System.lineSeparator();


protected void channelRead(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println("MyChatServerHandler.handlerAdded");
Channel channel = ctx.channel();
channelGroup.add(channel);
channelGroup.writeAndFlush("Server>" + channel.remoteAddress() + "add" + SEPARATOR + "加入\n");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
System.out.println("MyChatServerHandler.channelACTIVE");
System.out.println("client" + ctx.channel().remoteAddress() + "处于连接状态");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception{
System.out.println("Server: " + ctx);
Channel channel = ctx.channel();
System.out.println("===========|客户端发来的消息: " + channel.remoteAddress() + "> " + msg);

for (Channel ch : channelGroup) {
if (ch != channel) {
ch.writeAndFlush(channel.remoteAddress() + "> " + msg + SEPARATOR);
} else {
ch.writeAndFlush("自己" + "> " + msg + SEPARATOR);
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyChatServerHandler.channelInactive");
System.out.println("client" + ctx.channel().remoteAddress() + "处于断开状态");
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyChatServerHandler.handlerRemoved");

Channel channel = ctx.channel();
channelGroup.remove(channel); // Netty会自动寻找断掉的channel,然后移除,可以不用手动移除
channelGroup.writeAndFlush("Server> " + channel.remoteAddress() + " remove" + SEPARATOR);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

10.然后开始测试,将服务端和客户端分别启动,服务端:

6.1

客户端:

6.2

11.客户端给服务端发送消息,我是在控制台输入,其他特殊需求,也可以直接在代码里面写,或者页面实现,

客户端:

6.3

服务端:

6.4

总结,可以看到消息确实接收到了,但是,有一点难受,就是我实现不了服务端给客户端发送消息,我debug跑了几遍,在客户端监听不了ip和端口,我无法确切的去接收服务端发来的消息,我知道是可以实现的,但是我菜呀,我会继续学习,在后续中实现了,我就进行更新。

----------本文结束感谢您的阅读----------