《Netty 实战》 第一部分.md

1175

Netty 前言

Netty 异步和数据驱动

Netty 介绍

Netty 组成部分

  • Channel
  • Callback
  • Future : Netty 自己提供了 ChannelFuture,用于在执行异步操作时使用 。
  • Event 和 Handler

整合:

  • Future Callback 和 Handler
  • Selector Event 和 Event Loop

关于本书

Netty 第一个应用

Netty 架构模型的组件总览

  • Bootstrap
  • Channel
  • ChannelHandler
  • ChannelPipeline
  • EventLoop
  • ChannelFuture

Netty 的 Channel, Event 和 I/O

image20210730154558219.png

可以简单理解为 一个 EventLoop 的生命周期依赖于一个线程,EventLoopGroup 管理多个 EventLoop 。

当创建一个一个 Channel, Netty 通过一个单独的 EventLoop 实例来注册,所有 Channel 的 I/O 始终用相同的线程来执行

Netty 中 Bootstrapping 的作用

  • Bootstrap : 用于客户端,有一个 EventLoopGroup
  • ServerBootstrap : 用于服务端,有两个 EventLoopGroup

image20210730154945460.png

初识 Netty 的 ChannelHandler 和 ChannelPipeline

ChannelPipeline 是 ChannelHandler 链的容器 。

image20210730155212604.png

近距离观察 Netty 的 ChannelHandler

pipeline 中每个的 ChannelHandler 主要负责转发事件到链中的下一个处理器。直接重写 ChannelHandler 也是可以的,不过比较麻烦,Netty 提供了一系列 子类型 抽象类,进一步减少模板方法 。

编码器、解码器:

入站消息初始为字节流,解码器可以将其转换为 Java 对象。而对于出战消息,编码器可以将对象转换为字节流 。其基类名字类似 ByteToMessageDecoderMessageToByteEncoder

还有类似于 ProtobufEncoderProtobufDecoder 用于支持谷歌的 protocol buffer

SimpleChannelHandler

最简单的收到消息后的业务处理 Handler ,这里消息是 Java 对象类型,用泛型指定,一般前面需要添加解码器将比特流解码成消息 。

不过需要注意,SImpleChannelHandler 中不能阻塞线程,如果阻塞了是不利于高性能的 。

Netty 部件及概述总结

Netty 核心功能介绍

Netty 核心之 Transport ( 传输 )

  • NIO
  • OIO
  • Local ( 本地 )
  • Embedded ( 内嵌 )

Netty 实现阻塞式:

public class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();        //1

            b.group(group)                                    //2
             .channel(OioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {//3
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {            //4
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();  //6
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();        //7
        }
    }
}

Netty 实现非阻塞:

public class NettyNioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();    //1
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())   //2
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {    //3
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {    //4
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate())                //5
                                .addListener(ChannelFutureListener.CLOSE);
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();                    //6
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();                    //7
        }
    }
}

基于 Netty 传输的 API

image20210731152758955.png

每个 Channel 都会分配一个 ChannelPipeline 和 ChannelConfig 。 ChannelConfig 是 Channel 配置 。

ChannelPipeline 容纳了使用 ChannelHandler 的实例

Channel 的常用方法:

方法名描述
eventLoop()返回分配给该 Channel 的 EventLoop
pipeline()返回分配给该 Channel 的 ChannelPipeline
isActive()返回 Channel 是否激活,已激活说明与远程连接对等
localAddress()返回已绑定的本地 SocketAddress
remoteAddress()返回已绑定的远程 SocketAddress
write()写数据,数据会作为出战数据通过 ChannelPipeline
flush()刷新
writeAndFlush()调用 write 后 flush

Channel 的操作是线程安全的,不需要考虑同步问题

Netty 中包含的 Transport

方法名称描述
NIOio.netty.channel.socket.nio基于 java.nio.channels 的工具包,使用选择器作为基础方法
OIOio.netty.channel.socket.oio基于 java.net 的工具包,阻塞流
Localio.netty.channel.local在虚拟机之间本地通信
Embeddedio.netty.channel.embedded嵌入传输,主要用于测试 ChannelHandler

NIO

NIO 中我们可以注册一个通道并监听通道状态的改变,通道的状态有以下几种改变:

  • 新的 Channel 被接受且准备好
  • Channel 连接完成
  • Channel 中有数据并已准备好读取
  • Channel 发送数据出去

可以用于给选择器来管理多个 Channel,选择器支持的操作在 SelectionKey 中定义:

操作名称描述
OP_ACCEPT有新连接时得到通知
OP_CONNECT连接完成后得到通知
OP_READ准备好读取数据时得到通知
OP_WRITE写入更多数据到通道时得到通知

Netty 中有一种仅适用于 NIO 传输的功能 zero-file-copy 零文件拷贝 、

OIO

Netty 中 OIO 是一种妥协,其内部使用了超时时间来进行处理,设定一个 Socket 的 timeout,并在 SocektTimeoutException 异常抛出时捕获然并继续下一个循环。

同个JVM 内本地 Transport

可以使运行到同个 JVM 中的服务器和客户端之间提供异步通信,支持所有 Netty 常见的传输实现的 API 。

内嵌 Transport

提供 ChannelHandler 实例到其他 ChannelHandler 的传输,通常用于测试 ChannelHandler 的实现。

Netty 中 Transport 的使用情况

  • OIO 在低连接数、需要低延迟时、阻塞时使用
  • NIO 在高连接数时使用
  • Local 在同一给 JVM 内通信时使用
  • Embedded 测试 ChannelHandler 时使用

Netty 核心之 Buffer ( h缓冲 )

NIO 中提供了 ByteBuffer,然而其的作用受到限制,也没有优化。

Netty 中提供了一个强大的新的缓冲类 ByteBuf,其效率与 ByteBuffer 相当,但使用灵活。

Netty 中的 Buffer API

  • ByteBuf
  • ByteBufHolder

当开发者忘记释放资源时,Netty 会根据 reference-counting( 引用计数 ) 来自动进行 GC 。

以下是 Netty 中 Buffer 的优势

  • 自定义缓冲类型
  • 通过内置的复合缓冲类型实现零拷贝
  • 扩展性好
  • 不需要调用 flip() 来切换 读/写 模式
  • 读取和写入索引分开
  • 方法链
  • 引用计数
  • Pooling ( 池 )

Netty 字节数据的容器 ByteBuf

双索引法,Buf 中有两个指针,一个用来读,一个用来写,当读写指针重合时,在调用 read 的相关方法,则会抛出 IndexOutOfBoundsException 异常 。

其中 readwrite 的相关方法都会在写入或读取后更新指针位置,而 set get 的方法则直接读取或写入当前指针的数据,不会更新指针。

ByteBuf 使用模式:

  1. HEAP BUFFER ( 堆缓冲区 )

直接通过数组实现,数据存在于堆空间 。

可以通过 ByteBuf.array() 获取数组对象 (需要该缓冲使用堆模式,可使用 ByteBuf.hasArray() 方法判断)

  1. DIRECT BUFFER ( 直接缓冲区 )

在 JDK1.4 中被引入 NIO 的ByteBuffer 类允许 JVM 通过本地方法调用分配内存,其目的是

  • 通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
  • DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响.(详见http://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html.)
  1. COMPOSITE BUFFER ( 复合缓冲区 )

允许创建多个 ByteBuf,然后提供一个视图将它们组合起来 。Netty 提供了 ByteBuf 的子类 CompositeByteBuf 类来处理复合缓冲区,它只是一个视图 。

如下图:

image20210801103619658.png

CompositeByteBuf.hashArray() 总是返回 false 。

Netty 字节级别操作

随机访问

image20210801103915434.png

可丢弃字节的字节

可以通过调用 discardReadBytes() 来回收已经读取的字节空间,会将可读字节复制到开头,并相应的移动读写指针

可读字节

索引管理

可以通过 readIndex(int) 或 writeIndex(int) 将指针移动到指定位置,或者调用 clear() 将指针设置为 0

查询操作

通过 forEachByte 方法可以查找某个字符并返回下标,同时 ByteBufProcessor 中定义了一系列字符常量,比如需要查找回车符:

int index = buffer.forEachByte(ByteBufProcessor.FIND_CR);

衍生的缓冲区

“衍生的缓冲区”是代表一个专门的展示 ByteBuf 内容的“视图”。这种视图是由 duplicate(), slice(), slice(int, int),readOnly(), 和 order(ByteOrder)方法创建的。所有这些都返回一个新的 ByteBuf 实例包括它自己的 reader, writer 和标记索引。

拷贝缓冲区

可以使用 copy() 或 copy(int, int) 来拷贝一个 缓冲区

读/写操作

  • get()/set() 操作从给定的索引开始,保持不变
  • read()/write() 操作从给定的索引开始,与字节访问的数量来适用,递增当前的写索引或读索引

ByteBufHolder 的使用

ByteBufHolder 还提供了对于 Netty 的高级功能,如缓冲池,其中保存实际数据的 ByteBuf 可以从池中借用,如果需要还可以自动释放。

ByteBuf 分配

ByteBufAllocator

Netty 提供了两种 ByteBufAllocator 的实现,一种是 PooledByteBufAllocator,用ByteBuf 实例池改进性能以及内存使用降到最低,此实现使用一个“jemalloc”内存分配。其他的实现不池化 ByteBuf 情况下,每次返回一个新的实例。

Netty 默认使用 PooledByteBufAllocator,我们可以通过 ChannelConfig 或通过引导设置一个不同的实现来改变。更多细节在后面讲述 ,见 [Chapter 9, "Bootstrapping Netty Applications"](https://waylau.gitbooks.io/essential-netty-in-action/CORE FUNCTIONS/Bootstrapping.html)

常用方法

image20210801105124048.png

Unpooled 非池化缓存

Unpooled 提供了静态辅助类来创建 ByteBuf 实例

image20210801105142676.png

ByteBufUtil

Netty 引用计数器

引用计数器本身并不复杂;它能够在特定的对象上跟踪引用的数目,实现了ReferenceCounted 的类的实例会通常开始于一个活动的引用计数器为 1。而如果对象活动的引用计数器大于0,就会被保证不被释放。当数量引用减少到0,将释放该实例。需要注意的是“释放”的语义是特定于具体的实现。最起码,一个对象,它已被释放应不再可用。

主要是 ReferenceCounted 接口用于规定计数器,其中 ByteBuf 于 ByteBufHolder 都实现了该接口

一般而言,调用 reelease 方法会直接将引用计数器置为零 。

Netty 中的 ChannelHandler 和 Channel Pipeline

  • Channel
  • ChannelHandler
  • ChannePipeline
  • ChannelHandlerContext

ChannelHandler 家族

Channel 生命周期

状态描述
channelUnregistered已创建但未注册到 EventLoop
channelRegistered已创建并已注册到 EventLoop
channelActive变为活跃状态 ( 连接到了远程主机 ),现在可以接收和发送数据
chennelInactive变为非活跃状态,远程连接断开。

image20210801110406175.png

ChannelHandler 生命周期

类型描述
handlerAdded当 ChannelHandler 添加到 ChannelPipeline 调用
handlerRemoved当 ChannelHandler 从 ChannelPipeline 移除时调用
exceptionCaught当 ChannelPipeline 执行抛出异常时调用

ChannelHandler 子接口

Netty 提供 2 个重要的 ChannelHandler 子接口:

  • ChannelInboundHandler - 处理进站数据和所有状态改变事件
  • ChannelOutBoundHandler - 处理出站数据,允许拦截各种操作

ChannelHandler 适配器

Netty 提供了一个简单的 ChannelHandler 框架实现,给所有声明方法签名。这个类 ChannelHandlerAdapter 的方法,主要推送事件 到 pipeline 下个 ChannelHandler 直到 pipeline 的结束。这个类 也作为 ChannelInboundHandlerAdapter 和ChannelOutboundHandlerAdapter 的基础。所有三个适配器类的目的是作为自己的实现的起点;您可以扩展它们,覆盖你需要自定义的方法。

ChannelInboundHandler

ChannelInboundHandler 生命周期如下

image20210801111111855.png

使用中可以继承 ChannelInboundHandlerAdapter 并重写 channelRead 方法,不过这种写法不会帮你释放内存。

还可以继承 SimpleChannelInboundHandler 并重写 channelRead0 方法,该类带了一个泛型可以帮助将数据强转并自动释放内存 。

ChannelOutboundHandler

方法列表

image20210801111748130.png

几乎所有的方法都将 ChannelPromise 作为参数,一旦请求结束要通过 ChannelPipeline 转发的时候,必须通知此参数。

ChannelPromise vs. ChannelFuture

ChannelPromise 是 特殊的 ChannelFuture,允许你的 ChannelPromise 及其 操作 成功或失败。所以任何时候调用例如 Channel.write(...) 一个新的 ChannelPromise将会创建并且通过 ChannelPipeline传递。这次写操作本身将会返回 ChannelFuture, 这样只允许你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作为返回的 ChannelFuture 的通知,事实上在大多数时候就是 ChannelPromise 自身(ChannelPromise 扩展了 ChannelFuture)

如前所述,ChannelOutboundHandlerAdapter 提供了一个实现了 ChannelOutboundHandler 所有基本方法的实现的框架。 这些简单事件转发到下一个 ChannelOutboundHandler 管道通过调用 ChannelHandlerContext 相关的等效方法。你可以根据需要自己实现想要的方法。

资源管理

当你通过 ChannelInboundHandler.channelRead(...) 或者 ChannelOutboundHandler.write(...) 来处理数据,重要的是在处理资源时要确保资源不要泄漏。

Netty 使用引用计数器来处理池化的 ByteBuf。所以当 ByteBuf 完全处理后,要确保引用计数器被调整。

引用计数的权衡之一是用户时必须小心使用消息。当 JVM 仍在 GC(不知道有这样的消息引用计数)这个消息,以至于可能是之前获得的这个消息不会被放回池中。因此很可能,如果你不小心释放这些消息,很可能会耗尽资源。

为了让用户更加简单的找到遗漏的释放,Netty 包含了一个 ResourceLeakDetector ,将会从已分配的缓冲区 1% 作为样品来检查是否存在在应用程序泄漏。因为 1% 的抽样,开销很小。

Netty 的 ChannelPipeline

ChannelPipeline 与 Channel 的绑定是永久性的,每当创建新的 Channel Netty 就会将其绑定一个 ChannelPipeline 。