Netty入门与异步模型
本文通过一个 Netty 入门案例来阐述 Netty 的异步模型。
netty 入门案例
Netty 是由 JBOSS 提供的一个开源框架,它的 Maven 坐标如下:
1 | <dependency> |
Netty 服务端编写
Netty 服务端实现步骤是什么?
- 创建 bossGroup 线程组: 处理网络事件–连接事件
- 创建 workerGroup 线程组: 处理网络事件–读写事件
- 创建服务端启动助手
- 设置 bossGroup 线程组和 workerGroup 线程组
- 设置服务端通道实现为 NIO
- 参数设置
- 创建一个通道初始化对象
- 向 pipeline 中添加自定义业务处理 handler
- 启动服务端并绑定端口,同时将异步改为同步
- 关闭通道和关闭连接池
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37/**
* Netty服务端
*
* @name: NettyServer
* @author: terwer
* @date: 2022-04-21 14:41
**/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 1. 创建bossGroup线程组: 处理网络事件--连接事件,默认是2*处理器线程数目
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 2. 创建workerGroup线程组: 处理网络事件--读写事件,默认是2*处理器线程数目
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
// 3. 创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 4. 设置bossGroup线程组和workerGroup线程组
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) // 5. 设置服务端通道实现为NIO
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 6. 参数设置,设置活跃状态,child是设置workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() { // 7. 创建一个通道初始化对象
protected void initChannel(SocketChannel ch) throws Exception {
// 8. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandler());
}
});
// 9. 启动服务端并绑定端口,同时将异步改为同步
ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
System.out.println("服务器启动成功");
// 10. 关闭通道和关闭连接池(不是真正关闭,只是设置为关闭状态)
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
自定义服务端 Handler
1 | /** |
Netty 客户端编写
客户端实现步骤是什么?
- 创建线程组
- 创建客户端启动助手
- 设置线程组
- 设置客户端通道实现为 NIO
- 创建一个通道初始化对象
- 向 pipeline 中添加自定义业务处理 handler
- 启动客户端,等待连接服务端,同时将异步改为同步
- 关闭通道和关闭连接池
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/**
* Netty客户端
*
* @name: NettyClient
* @author: terwer
* @date: 2022-04-21 15:04
**/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 1. 创建线程组
NioEventLoopGroup group = new NioEventLoopGroup();
// 2. 创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
// 3. 设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() { // 4. 设置客户端通道实现为NIO
protected void initChannel(SocketChannel ch) throws Exception { // 5. 创建一个通道初始化对象
// 6. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyClientHandler());
}
});
// 7. 启动客户端,等待连接服务端,同时将异步改为同步
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9999)).sync();
// 8. 关闭通道和关闭连接池
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
自定义客户端 Handler
1 | /** |
netty 异步模型
基本介绍
当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在调用完成后,通过状态、通知和回调来通知调用者。
NIO 中的 I/O 操作都是异步的,包括 Bind、Write、Connect 等操作都会简单的返回一个 ChannelFuture。
调用者不能立即获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
Netty 的异步模型建立在 future 和 callback 之上。callback 就是回调。
future 的核心思想是:
假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然是不合适的。可以在调用 fun 的时候,立马返回一个 future,后续可以通过 future 去监控方法 fun 的处理过程,就是 Future-Listener 机制。
Future 和 Future-Listener 机制
Future
表示异步的执行结果,可以通过它提供的方法检测任务是否完成。ChannelFuture 是它的一个子接口,可以添加监听器,当监听事件发生时,就会通知到监听器。
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常用方法有:
sync():阻塞等待程序结果返回
isDone():判断当前操作是否完成
isSuccess():判断已完成操作是否成功
getCause():获取操作失败原因
isCanceled():判断当前操作是否取消
addListener():注册监听器,当操作已完成时(isDone 方法返回完成),会通知指定监听器。
如果 Future 对象已完成,则通知指定监听器。
Future-Listener 机制
给 Future 添加监听器,监听操作结果:
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13ChannelFuture channelFuture = serverBootstrap.bind(9999).sync();
System.out.println("服务器启动成功");
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});1
2
3
4
5
6
7
8
9
10
11ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty客户端", CharsetUtil.UTF_8));
channelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});