NIO 使用

721

前面学习了 I/O 流,是 java.io 里的包,了解到 java 在 14 版本后新加入了一套 I/O 流 ,在 java.nio 包中,与传统 I/O 流相比,我认为最主要区别有两个:

  1. 抽象出几个对象,分别是 ChannelBufferSelector,他们分别的作用接下来会讲到。
  2. 针对两个场景做了优化,一是读写文件的场景,在传统 I/O 流 中,对文件读和写是需要两个流,而在 NIO 中,一个 Channel 对象是可读又可写,(也可以设定只读,这种是特殊需求)。二是网络编程的环境中,在服务端传统 I/O 流 的写法中,一个客户端接入,服务端就需要开一个线程进行交互,而对于 NIO ,可以使用非阻塞式的写法,同时使用一个 Selector 对象去管理多个 Channel,可以做到使用一个线程与多个客户端交互。

先介绍一下 ChannelBuffer

Channel

顾名思义,就是管道,这个管道的一端是我们的程序,另一端是另一个地方,可能是网络数据,也可能是文件数据。与传统 I/O 流 不同,这个 Channel 对象是可以双向的,为什么说可以呢,因为有 ReadableByteChannelWritableByteChannel 两个接口给 Channel 加入了 Write()Read() 方法,顺带一提,Channel 的采用了一种接口修饰的设计模式,初始 Channel 基类只提供 isOpen()close() 两个方法。其他各种方法都是通过其他接口加上。

Buffer

顾名思义,就是缓冲区,其内部就是一个数组的实现,可以写入和读取数据,并且提供了各种类型的写入和读取。有四个成员变量需要注意:

  • capacity :容量,值这个 Buffer 最多能容纳的字节数
  • limit :当前操作的边界,读操作和写操作时要保证 position <= limit,等下会看见它的用途
  • position:当前指针,读或写时候的指针
  • mark:标记,可以标记当前位置,在源码中没看到作用,只是给外界提供一个标记的功能(甚至没看见外界获取 mark 的方法)

其中 limitposition 方法使用了新的链式调用 Setter/Getter 写法:

Buffer position(int position){
    // 省略参数合法判断
    this.position = positon;
    return this;
}

int position(){
    return this.position;
}

Buffer limit(int limit){
    // 省略参数合法判断
    this.limit = limit;
    return this;
}

int limit(){
    return this.limit;
}

此外,Buffer 有如下派生类:

  • ByteBuffer
  • LongBuffer
  • ShortBuffer

等等,可以自己去看,基本都能顾名思义

对于数据处理,Byffer 提供了 putget 方法写入和读取数据,注意,对于写入操作,是从 position 往后写入数据,写入后 position 会自动 +1,对于读取操作,是读取 positon 位置的数据,读取后会自动 +1,如果读的时候 positon > limit,则会抛异常。

可以看下图:

![GIF 2021-4-6 16-26-03.gif][1]

可以看到,在读写之前,会涉及到对指针的一些初始化,具体如下:

  • 在你使用过 Buffer 之后,下次要写之前,要先调用 position(0)limit(capacity) 来初始化指针,这里提供给了一个 clear() 方法自动帮我们完成以上操作。
  • 在通过管道将数据写入 Buffer 后,读取之前,因此此时对于写端,其 position 指向它写入的最后一个数据的下一个,limit 指向 capacity,因此要先调用 limit(positon()) 在调用 position(0),然后才可以正常读取数据,这里也提供了 flip() 方法,翻转这个 Buffer
// 要写入 Buffer

buffer.clear();
buffer.put(0);// 写入数据

// 写入后要读取

buffer.flip();
while(buffer.position() < buffer.limit()){
    buffer.get(); // 一个一个读
}

// 对于 byteBuffer ,如果是字符串,可以用以下方法直接读

String res = byteBuffer.flip().asCharBuffer().toString();

Buffer 和 Channel

因为 NIO 源码非常长,并且 NIOapi 并不好用,实际工作中很少,几乎并不会用到(但是其有积极意义,之后会说到)。因此这里直接通过举两个场景,说明如何使用 BufferChannel,只作简单了解。

NIO 里,操作只有两种情况,一是将 Buffer 里的数据写入 Channel,二是将 Channel 里的数据写入 Buffer,这里也是举两个操作作为例子:

文件

首先我们要拿到 Channel 对象,对于文件,获取到的是其派生类 FileChannel

// rw 表示 read and write
RandomAccessFile randomAccessFile = new RandomAccessFile("test.txt","rw");
FileChannel fileChannel = randomAccessFile.getChannel();

然后我们需要构造一个 Buffer,这里获取到的依然是其派生类 ByteBuffer

// 1024 为 capacity
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

先将数据写入文件,要将数据写入文件(即写入 Channel),要先将数据写入 Buffer

byteBuffer.asCharBuffer().append("Hello NIO!");

然后将数据写入 channel

fileChannel.write(byteBuffer);

是不是挺简单的,接下来我们来读取一下刚刚写入的数据,首先处理一下指针:

fileChannel.position(0); // 管道指针复原
byteBuffer.clear(); // buffer 复原
fileChannel.read(byteBuffer); // 从管道读数据到 buffer

现在数据已经写入 Buffer,我们来从 Buffer 读取一下,别忘记翻转 Buffer

byteBuffer.flip(); // 翻转

StringBuilder stringBuilder = new StringBuilder();
while(byteBuffer.position() < byteBuffer.limit()){ // 指针判断
    stringBuilder.append(byteBuffer.getChar()); // 按 char 读
}
System.out.println(stringBuilder.toString());

因为这里是字符串,因此可以使用上面说的特殊方法:

System.out.println(byteBuffer.asCharBuffer().toString());

可以看到我们打印成功了刚刚写入的消息,可喜可贺!

总代码:

/**
 * Created by HeYanLe on 2021/4/2 22:44.
 * https://github.com/heyanLE
 */
public class Main {

    public static void main (String[] args) throws IOException{
        RandomAccessFile randomAccessFile = new RandomAccessFile("test.txt","rw");
        FileChannel fileChannel = randomAccessFile.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.asCharBuffer().append("Hello NIO!");

        fileChannel.write(byteBuffer);

        fileChannel.position(0); // 管道指针复原
        byteBuffer.clear(); // buffer 复原
        fileChannel.read(byteBuffer); // 从管道读数据到 buffer
        byteBuffer.flip(); // buffer 翻转

        Selector selector = Selector.open();

        // 方法一
        System.out.println(byteBuffer.asCharBuffer().toString());


        byteBuffer.flip();

        // 方法二
        StringBuilder stringBuilder = new StringBuilder();
        while(byteBuffer.position() < byteBuffer.limit()){
            stringBuilder.append(byteBuffer.getChar());
        }
        System.out.println(stringBuilder.toString());


    }

}

网络

这里以 socket 的阻塞式写法举例,以下是 Channel 的获取,关于 Buffer 的操作,和以上介绍一致,这里不做过多介绍:

// 客户端
SocketChannel socketChannel = SocketChannel.open();
        
socketChannel.bind(new InetSocketAddress("127.0.0.1", 8013));

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
 System.out.println(stringBuilder.toString());


// 服务端
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress( 8013));
while(true){
    SocketChannel client = socketChannel.accept(); // 用户连接会阻塞
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    byteBuffer.asCharBuffer().append("Hello server, I am Client !");
    client.write(byteBuffer);
}

selector

对于网络编程,以上写法是阻塞式的,对于服务端,每有一个用户连接,就需要开一个线程,线程需要耗费额外资源,在一些高并发的场景,多个线程的效率会变得非常低。NIO 为网络提供了一种非阻塞式的写法,例如:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.bind(new InetSocketAddress( 8013));
serverSocketChannel.configureBlocking(false); // 非阻塞式

此时如果你调用 socketChannel.accpet() 方法,将不会阻塞线程,如果没用连接,则会返回 null

同时,对于 socketChannel 也有类似操作(当服务端收到客户端连接时候,会生成一个 socketChannel 对象用于与客户端通信):

SocketChannel socketChannel = serverSocketChannel.accpet();

然后我们可以使用一个 selector 来管理。selector 是多路复用器,它会选择一个需要处理的 Channel 来处理,当然 selector 只能处理非阻塞 channel,也就是上面的 FileChannel 就无法使用。

SelectionKey

这个对象表示了一个 channel 的状态,它本身并不在 channel 的成员变量中,只是在 selector 中针对每个 channel 维护了一个 selectionKey

selectionKey 采用状态压缩的方法维护了四个布尔值的状态,分别是:

  • isAcceptable:ServerSocketChannel 才有意义,当有用户连接的时候,这个状态会变为 true,此时调用对应 channelaccpet() 方法能拿到连接的 socketChannel 对象。
  • isChonnect: 当连接成功建立的时候,这个状态会变为 true
  • isWritable:当通道可写的时候,这个状态会变为 true,一般不用关心这个事件,只要这个 Channel 正常,缓冲区不为空,则可写
  • isReadable: 当可读的时候,这个状态会变为 true,一开始为 false,只有当接收到数据,有数据可读的时候,这个状态才会变为 true

Selector 工作过程

首先获取一个 selector 对象:

Selector selector = Selector.open();

我们使用如下代码来将一个 channel 注册到 selector 中:

serverSocketChannel.register(selector,
        SelectionKey.OP_ACCEPT|SelectionKey.OP_READ
);

这里 SelectionKey.OP_ACCEPT|SelectionKey.OP_READ 表示我们对 ACCPETREAD 的状态感兴趣。

然后我们调用 selector.select()方法,这个方法会阻塞,然后返回当前处于 被选中 状态的 channel 的数量。 被选中channel 具有如下特征:

  1. 这个 channel 在当前 selector 注册了
  2. 这个 channel 在注册时传入的 感兴趣的某个状态当前为 true

同时可能有多个 channel 符合被选择的条件,在 select() 阻塞完毕后,我们可以调用 selector.selectedKeys() 获得当前所有 被选中 状态的 SelectionKey 对象,这个对象可以拿到 Channel 对象,然后进行操作。

例如,我们这里写一个小 demo,写一个服务端,将所有客户端发来的消息都打印,首先是客户端:这里比较简单,因为客户端不用接收服务端发来的消息,我们直接检测输入然后发给服务端:

/**
 * Created by HeYanLe on 2021/4/6 21:32.
 * https://github.com/heyanLE
 */
public class ClientDemo {

    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 6556));
        ReadableByteChannel readableByteChannel = Channels.newChannel(System.in);
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        while(true){
            byteBuffer.clear();
            readableByteChannel.read(byteBuffer);
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
        }
    }
}

这里我们使用 Channels.newChannel(InputStream) 方法拿到了一个可读 channel,然后读完之后直接将 byteBuffer 原样发给客户端。

然后是服务端,首先是服务端本身:

ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress( 6556));
socketChannel.configureBlocking(false);

socketChannel.accept();

Selector selector = Selector.open();
socketChannel.register(selector,
        SelectionKey.OP_ACCEPT // 当出现 SelectionKey.OP_ACCEPT 事件时,停止阻塞
);

我们构造了一个ServerSocketChannel,然后注册到一个 Selector 里,这里我们只关注 ACCEPT 状态。

然后我们死循环来选择:

while(true){
    selector.select();
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    while(iterator.hasNext()){
        SelectionKey selectionKey = iterator.next();
        handlerSelectionKey(selectionKey, iterator, selector);
    }
}

调用 select() 来阻塞,然后通过 selectedKeys() 方法获取所有选中 SelectionKey,然后使用迭代器来处理每一个选中的 SelectionKey

然后是处理事件了:

private static void handlerSelectionKey (
    SelectionKey selectionKey,
    Iterator<SelectionKey> iterator, 
    Selector selector) throws IOException{

        if(selectionKey.isAcceptable()){ // 有新客户端连接
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel1 = serverSocketChannel.accept(); // 获取 socketChannel 非阻塞,因为已经有了状态所以一定非空
            socketChannel1.configureBlocking(false); // 非阻塞
            socketChannel1.register(selector,
                    SelectionKey.OP_READ  //对于客户端只关心可读
            );
            iterator.remove(); // 移除状态,等待下次新客户端连接


        } else if(selectionKey.isReadable()){

        }
    }

这里我们先判断状态,如果是 Acceptable 说明是服务端,我们获取 channel 并强转成 ServerSocketChannel,然后我们调用 accpet() 获取连接,注意虽然刚刚说了非阻塞这个可能为 null,但这里的 selectionKeyisAcceptabletrue 表示一定有,这里返回的不可能为空。

然后我们将这个 socketChannel 设置为 非阻塞,然后在同一个 selector 中注册,这里我们只关心可读状态。

然后是可读了,直接读即可:

……
else if(selectionKey.isReadable()){
    SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel();
    byteBuffer.clear();
    socketChannel1.read(byteBuffer);
    byteBuffer.flip();

    Charset charset = Charset.defaultCharset();
    CharBuffer charBuffer = charset.decode(byteBuffer);
    System.out.println((charBuffer.toString().intern()));
    iterator.remove();
}
……

注意因为编码问题,这里需要一个 Charset 来解码数据。

注意这两个处理最后访问了 iterator.remove(); 来移除当前 selectionKey 的选中状态,否则下次 selecte 方法里依然有这个 selectionKey

总代码:

/**
 * Created by HeYanLe on 2021/4/2 22:44.
 * https://github.com/heyanLE
 */
public class Main {

    private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

    private static void handlerSelectionKey (SelectionKey selectionKey,Iterator<SelectionKey> iterator, Selector selector) throws IOException{
        if(selectionKey.isAcceptable()){ // 有新客户端连接
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
            SocketChannel socketChannel1 = serverSocketChannel.accept(); // 获取 socketChannel 非阻塞,因为已经有了状态所以一定非空
            socketChannel1.configureBlocking(false); // 非阻塞
            socketChannel1.register(selector,
                    SelectionKey.OP_READ  //对于客户端只关心可读
            );
            iterator.remove(); // 移除状态,等待下次新客户端连接


        } else if(selectionKey.isReadable()){
            SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel();
            byteBuffer.clear();
            socketChannel1.read(byteBuffer);
            byteBuffer.flip();

            Charset charset = Charset.defaultCharset();
            CharBuffer charBuffer = charset.decode(byteBuffer);
            System.out.println((charBuffer.toString().intern()));
            iterator.remove();
        }
    }
    public static void main (String[] args) throws IOException{

        ServerSocketChannel socketChannel = ServerSocketChannel.open();


        socketChannel.bind(new InetSocketAddress( 6556));
        socketChannel.configureBlocking(false);

        socketChannel.accept();

        Selector selector = Selector.open();
        socketChannel.register(selector,
                SelectionKey.OP_ACCEPT // 当出现 SelectionKey.OP_ACCEPT 事件时,停止阻塞
        );


        while(true){
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){
                SelectionKey selectionKey = iterator.next();
                handlerSelectionKey(selectionKey, iterator, selector);
            }
        }
    }

}

后记

java.nioapi 非常反人类,每次操作都需要处理+ Buffer。在实际工作中,我们很少或者是几乎不会用到 nio (我是宁愿用传统 I/O),但是在我看来,nio 有两个意义:

  1. 比起传统 I/O 有了结构化抽象化的意义,其抽象出了 ChannelBuffer 对象,这种思维是可以学习的,把重点放在了端,而传统的 IO流 重点是数据流。
  2. 在网络的 Channel 中,可以使用非阻塞式的写法,在传统 I/O 中,如果你要写一个百万级服务器,那么每一个客户端都需要新建一个线程,因为 传统 I/O 只能是阻塞的,那么你需要开 上万个线程,这对操作系统来说压力是比较大的,如果你使用线程池的话,也是需要一个超时时间,一个线程同时只能处理一个 流。而使用 非阻塞 NIO 的话,我们可以使用非阻塞式,然后使用一个 Selector 对象来多路复用,一个线程可以管理多个 Channel

但是这两点在其缺点看来依然是微不足道。因此 nio 虽然思想很好,但是真正使用的的人是微乎其微。