简述netty工作流程
Netty核心组件介绍
- EventLoopGroup
- 看名字就知道,这是用来管理EventLoop的,同时将接收到的请求按轮询的方式转发给EventLoop
- EventLoop
- 负责将EventLoopGroup转发过来的请求(SocketChannel)注册到selector,并监听后续事件
- pipeline
- 每一个channel都有一个对应的pipeline,pipeline维护了一个双向链表,节点是ChannelHandlerContext,分出站(tail -> head)和入站(head -> tail)两个方向
- ChannelHandler
- 真正干活的,用来处理各种事件,(读,写,连接等),分入站和出站两种,对应处理入站和出站所产生的事件
- ChannelHandlerContext
- ChannelHandler上下文对象,包含了跟当前连接关联的所有资源,简单来说就是可以拿到很多东西,此外,还是pipeline中的双向链表的结点
- ChannelFuture
- Channel异步IO操作的结果,该实例可提供有关IO操作的返回结果或状态
Netty工作流程
netty采用的是主从reactor多线程模型
作为服务端而言
- 主reactor,也就是BossGrop(EventLoopGroup) 会处理连接请求,并把对应的SocketChannel交给WorkGroup(EventLoopGroup)
- WorkGroup拿到SocketChannel后,会按照轮询的方式获取一个自己所管理的EventLoop,并把它注册到该EventLoop所持有的selector上,并监听后续事件
- 监听到入站或出站事件后,会调用pipeline中的入站处理器链(InBoundHandler)或出站处理器链(OudBoundHandler)进行处理
Demo
Server
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server { public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); try {
ChannelFuture cf = serverBootstrap .group(bossGroup, workGroup) .channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerInBoundHandler()); } }) .bind(6666) .sync(); System.out.println("服务器启动完成"); cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }
} }
|
Client
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;
public class Client { public static void main(String[] args) { EventLoopGroup workGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { ChannelFuture connect = bootstrap .group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientInBoundHandler()); } }) .connect("127.0.0.1", 6666) .sync(); connect.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } }
|
ServerInBoundHandler
mport io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
public class ServerInBoundHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("Server: channel要被注册了"); }
@Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("Server: channel注销了"); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Server: channel处于活跃状态"); }
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Server: channel不活跃了"); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("Server: 读事件触发,读取到的内容为: " + buf.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端",CharsetUtil.UTF_8)); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("Server: 读完了"); }
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("Server: 自定义事件触发了"); }
@Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("Server: channel不能写了"); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("Server: 有异常! : " + cause.getMessage()); } }
|
ClientInBoundHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
public class ClientInBoundHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务端的回复: " + buf.toString(CharsetUtil.UTF_8)); ctx.close(); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("我是客户端", CharsetUtil.UTF_8)); } }
|
结果: