NIO 使用
前面学习了 I/O 流
,是 java.io
里的包,了解到 java 在 14 版本后新加入了一套 I/O 流
,在 java.nio
包中,与传统 I/O
流相比,我认为最主要区别有两个:
- 抽象出几个对象,分别是
Channel
、Buffer
和Selector
,他们分别的作用接下来会讲到。 - 针对两个场景做了优化,一是读写文件的场景,在传统
I/O 流
中,对文件读和写是需要两个流,而在NIO
中,一个Channel
对象是可读又可写,(也可以设定只读,这种是特殊需求)。二是网络编程的环境中,在服务端传统I/O 流
的写法中,一个客户端接入,服务端就需要开一个线程进行交互,而对于NIO
,可以使用非阻塞式的写法,同时使用一个Selector
对象去管理多个Channel
,可以做到使用一个线程与多个客户端交互。
先介绍一下 Channel
和 Buffer
Channel
顾名思义,就是管道,这个管道的一端是我们的程序,另一端是另一个地方,可能是网络数据,也可能是文件数据。与传统 I/O 流
不同,这个 Channel
对象是可以双向的,为什么说可以呢,因为有 ReadableByteChannel
和 WritableByteChannel
两个接口给 Channel
加入了 Write()
和 Read()
方法,顺带一提,Channel
的采用了一种接口修饰的设计模式,初始 Channel
基类只提供 isOpen()
和 close()
两个方法。其他各种方法都是通过其他接口加上。
Buffer
顾名思义,就是缓冲区,其内部就是一个数组的实现,可以写入和读取数据,并且提供了各种类型的写入和读取。有四个成员变量需要注意:
capacity
:容量,值这个Buffer
最多能容纳的字节数limit
:当前操作的边界,读操作和写操作时要保证position <= limit
,等下会看见它的用途position
:当前指针,读或写时候的指针mark
:标记,可以标记当前位置,在源码中没看到作用,只是给外界提供一个标记的功能(甚至没看见外界获取mark
的方法)
其中 limit
和 position
方法使用了新的链式调用 Setter/Getter
写法:
Buffer position(int position){
// 省略参数合法判断
this.position = positon;
return this;
}
int position(){
return this.position;
}
Buffer limit(int limit){
// 省略参数合法判断
this.limit = limit;
return this;
}
int limit(){
return this.limit;
}
此外,Buffer
有如下派生类:
- ByteBuffer
- LongBuffer
- ShortBuffer
等等,可以自己去看,基本都能顾名思义
对于数据处理,Byffer
提供了 put
和 get
方法写入和读取数据,注意,对于写入操作,是从 position
往后写入数据,写入后 position
会自动 +1
,对于读取操作,是读取 positon
位置的数据,读取后会自动 +1
,如果读的时候 positon > limit
,则会抛异常。
可以看下图:
![GIF 2021-4-6 16-26-03.gif][1]
可以看到,在读写之前,会涉及到对指针的一些初始化,具体如下:
- 在你使用过
Buffer
之后,下次要写之前,要先调用position(0)
和limit(capacity)
来初始化指针,这里提供给了一个clear()
方法自动帮我们完成以上操作。 - 在通过管道将数据写入
Buffer
后,读取之前,因此此时对于写端,其position
指向它写入的最后一个数据的下一个,limit
指向capacity
,因此要先调用limit(positon())
在调用position(0)
,然后才可以正常读取数据,这里也提供了flip()
方法,翻转这个Buffer
。
// 要写入 Buffer
buffer.clear();
buffer.put(0);// 写入数据
// 写入后要读取
buffer.flip();
while(buffer.position() < buffer.limit()){
buffer.get(); // 一个一个读
}
// 对于 byteBuffer ,如果是字符串,可以用以下方法直接读
String res = byteBuffer.flip().asCharBuffer().toString();
Buffer 和 Channel
因为 NIO
源码非常长,并且 NIO
的 api
并不好用,实际工作中很少,几乎并不会用到(但是其有积极意义,之后会说到)。因此这里直接通过举两个场景,说明如何使用 Buffer
和 Channel
,只作简单了解。
在 NIO
里,操作只有两种情况,一是将 Buffer
里的数据写入 Channel
,二是将 Channel
里的数据写入 Buffer
,这里也是举两个操作作为例子:
文件
首先我们要拿到 Channel
对象,对于文件,获取到的是其派生类 FileChannel
:
// rw 表示 read and write
RandomAccessFile randomAccessFile = new RandomAccessFile("test.txt","rw");
FileChannel fileChannel = randomAccessFile.getChannel();
然后我们需要构造一个 Buffer
,这里获取到的依然是其派生类 ByteBuffer
// 1024 为 capacity
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
先将数据写入文件,要将数据写入文件(即写入 Channel
),要先将数据写入 Buffer
:
byteBuffer.asCharBuffer().append("Hello NIO!");
然后将数据写入 channel
:
fileChannel.write(byteBuffer);
是不是挺简单的,接下来我们来读取一下刚刚写入的数据,首先处理一下指针:
fileChannel.position(0); // 管道指针复原
byteBuffer.clear(); // buffer 复原
fileChannel.read(byteBuffer); // 从管道读数据到 buffer
现在数据已经写入 Buffer
,我们来从 Buffer
读取一下,别忘记翻转 Buffer
:
byteBuffer.flip(); // 翻转
StringBuilder stringBuilder = new StringBuilder();
while(byteBuffer.position() < byteBuffer.limit()){ // 指针判断
stringBuilder.append(byteBuffer.getChar()); // 按 char 读
}
System.out.println(stringBuilder.toString());
因为这里是字符串,因此可以使用上面说的特殊方法:
System.out.println(byteBuffer.asCharBuffer().toString());
可以看到我们打印成功了刚刚写入的消息,可喜可贺!
总代码:
/**
* Created by HeYanLe on 2021/4/2 22:44.
* https://github.com/heyanLE
*/
public class Main {
public static void main (String[] args) throws IOException{
RandomAccessFile randomAccessFile = new RandomAccessFile("test.txt","rw");
FileChannel fileChannel = randomAccessFile.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.asCharBuffer().append("Hello NIO!");
fileChannel.write(byteBuffer);
fileChannel.position(0); // 管道指针复原
byteBuffer.clear(); // buffer 复原
fileChannel.read(byteBuffer); // 从管道读数据到 buffer
byteBuffer.flip(); // buffer 翻转
Selector selector = Selector.open();
// 方法一
System.out.println(byteBuffer.asCharBuffer().toString());
byteBuffer.flip();
// 方法二
StringBuilder stringBuilder = new StringBuilder();
while(byteBuffer.position() < byteBuffer.limit()){
stringBuilder.append(byteBuffer.getChar());
}
System.out.println(stringBuilder.toString());
}
}
网络
这里以 socket
的阻塞式写法举例,以下是 Channel
的获取,关于 Buffer
的操作,和以上介绍一致,这里不做过多介绍:
// 客户端
SocketChannel socketChannel = SocketChannel.open();
socketChannel.bind(new InetSocketAddress("127.0.0.1", 8013));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
System.out.println(stringBuilder.toString());
// 服务端
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress( 8013));
while(true){
SocketChannel client = socketChannel.accept(); // 用户连接会阻塞
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.asCharBuffer().append("Hello server, I am Client !");
client.write(byteBuffer);
}
selector
对于网络编程,以上写法是阻塞式的,对于服务端,每有一个用户连接,就需要开一个线程,线程需要耗费额外资源,在一些高并发的场景,多个线程的效率会变得非常低。NIO
为网络提供了一种非阻塞式的写法,例如:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress( 8013));
serverSocketChannel.configureBlocking(false); // 非阻塞式
此时如果你调用 socketChannel.accpet()
方法,将不会阻塞线程,如果没用连接,则会返回 null
。
同时,对于 socketChannel
也有类似操作(当服务端收到客户端连接时候,会生成一个 socketChannel
对象用于与客户端通信):
SocketChannel socketChannel = serverSocketChannel.accpet();
然后我们可以使用一个 selector
来管理。selector
是多路复用器,它会选择一个需要处理的 Channel
来处理,当然 selector
只能处理非阻塞 channel
,也就是上面的 FileChannel
就无法使用。
SelectionKey
这个对象表示了一个 channel
的状态,它本身并不在 channel
的成员变量中,只是在 selector
中针对每个 channel
维护了一个 selectionKey
。
selectionKey
采用状态压缩的方法维护了四个布尔值的状态,分别是:
- isAcceptable:
ServerSocketChannel
才有意义,当有用户连接的时候,这个状态会变为true
,此时调用对应channel
的accpet()
方法能拿到连接的socketChannel
对象。 - isChonnect: 当连接成功建立的时候,这个状态会变为
true
- isWritable:当通道可写的时候,这个状态会变为
true
,一般不用关心这个事件,只要这个Channel
正常,缓冲区不为空,则可写 - isReadable: 当可读的时候,这个状态会变为
true
,一开始为false
,只有当接收到数据,有数据可读的时候,这个状态才会变为true
Selector 工作过程
首先获取一个 selector
对象:
Selector selector = Selector.open();
我们使用如下代码来将一个 channel
注册到 selector
中:
serverSocketChannel.register(selector,
SelectionKey.OP_ACCEPT|SelectionKey.OP_READ
);
这里 SelectionKey.OP_ACCEPT|SelectionKey.OP_READ
表示我们对 ACCPET
和 READ
的状态感兴趣。
然后我们调用 selector.select()
方法,这个方法会阻塞,然后返回当前处于 被选中
状态的 channel
的数量。 被选中
的 channel
具有如下特征:
- 这个
channel
在当前selector
注册了 - 这个
channel
在注册时传入的 感兴趣的某个状态当前为true
同时可能有多个 channel
符合被选择的条件,在 select()
阻塞完毕后,我们可以调用 selector.selectedKeys()
获得当前所有 被选中
状态的 SelectionKey
对象,这个对象可以拿到 Channel
对象,然后进行操作。
例如,我们这里写一个小 demo
,写一个服务端,将所有客户端发来的消息都打印,首先是客户端:这里比较简单,因为客户端不用接收服务端发来的消息,我们直接检测输入然后发给服务端:
/**
* Created by HeYanLe on 2021/4/6 21:32.
* https://github.com/heyanLE
*/
public class ClientDemo {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 6556));
ReadableByteChannel readableByteChannel = Channels.newChannel(System.in);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while(true){
byteBuffer.clear();
readableByteChannel.read(byteBuffer);
byteBuffer.flip();
socketChannel.write(byteBuffer);
}
}
}
这里我们使用 Channels.newChannel(InputStream)
方法拿到了一个可读 channel
,然后读完之后直接将 byteBuffer
原样发给客户端。
然后是服务端,首先是服务端本身:
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress( 6556));
socketChannel.configureBlocking(false);
socketChannel.accept();
Selector selector = Selector.open();
socketChannel.register(selector,
SelectionKey.OP_ACCEPT // 当出现 SelectionKey.OP_ACCEPT 事件时,停止阻塞
);
我们构造了一个ServerSocketChannel
,然后注册到一个 Selector
里,这里我们只关注 ACCEPT
状态。
然后我们死循环来选择:
while(true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
handlerSelectionKey(selectionKey, iterator, selector);
}
}
调用 select()
来阻塞,然后通过 selectedKeys()
方法获取所有选中 SelectionKey
,然后使用迭代器来处理每一个选中的 SelectionKey
。
然后是处理事件了:
private static void handlerSelectionKey (
SelectionKey selectionKey,
Iterator<SelectionKey> iterator,
Selector selector) throws IOException{
if(selectionKey.isAcceptable()){ // 有新客户端连接
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel1 = serverSocketChannel.accept(); // 获取 socketChannel 非阻塞,因为已经有了状态所以一定非空
socketChannel1.configureBlocking(false); // 非阻塞
socketChannel1.register(selector,
SelectionKey.OP_READ //对于客户端只关心可读
);
iterator.remove(); // 移除状态,等待下次新客户端连接
} else if(selectionKey.isReadable()){
}
}
这里我们先判断状态,如果是 Acceptable
说明是服务端,我们获取 channel
并强转成 ServerSocketChannel
,然后我们调用 accpet()
获取连接,注意虽然刚刚说了非阻塞这个可能为 null
,但这里的 selectionKey
的 isAcceptable
为 true
表示一定有,这里返回的不可能为空。
然后我们将这个 socketChannel
设置为 非阻塞
,然后在同一个 selector
中注册,这里我们只关心可读状态。
然后是可读了,直接读即可:
……
else if(selectionKey.isReadable()){
SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel();
byteBuffer.clear();
socketChannel1.read(byteBuffer);
byteBuffer.flip();
Charset charset = Charset.defaultCharset();
CharBuffer charBuffer = charset.decode(byteBuffer);
System.out.println((charBuffer.toString().intern()));
iterator.remove();
}
……
注意因为编码问题,这里需要一个 Charset
来解码数据。
注意这两个处理最后访问了 iterator.remove();
来移除当前 selectionKey
的选中状态,否则下次 selecte
方法里依然有这个 selectionKey
总代码:
/**
* Created by HeYanLe on 2021/4/2 22:44.
* https://github.com/heyanLE
*/
public class Main {
private static final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
private static void handlerSelectionKey (SelectionKey selectionKey,Iterator<SelectionKey> iterator, Selector selector) throws IOException{
if(selectionKey.isAcceptable()){ // 有新客户端连接
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel1 = serverSocketChannel.accept(); // 获取 socketChannel 非阻塞,因为已经有了状态所以一定非空
socketChannel1.configureBlocking(false); // 非阻塞
socketChannel1.register(selector,
SelectionKey.OP_READ //对于客户端只关心可读
);
iterator.remove(); // 移除状态,等待下次新客户端连接
} else if(selectionKey.isReadable()){
SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel();
byteBuffer.clear();
socketChannel1.read(byteBuffer);
byteBuffer.flip();
Charset charset = Charset.defaultCharset();
CharBuffer charBuffer = charset.decode(byteBuffer);
System.out.println((charBuffer.toString().intern()));
iterator.remove();
}
}
public static void main (String[] args) throws IOException{
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.bind(new InetSocketAddress( 6556));
socketChannel.configureBlocking(false);
socketChannel.accept();
Selector selector = Selector.open();
socketChannel.register(selector,
SelectionKey.OP_ACCEPT // 当出现 SelectionKey.OP_ACCEPT 事件时,停止阻塞
);
while(true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
handlerSelectionKey(selectionKey, iterator, selector);
}
}
}
}
后记
java.nio
的 api
非常反人类,每次操作都需要处理+ Buffer
。在实际工作中,我们很少或者是几乎不会用到 nio
(我是宁愿用传统 I/O),但是在我看来,nio 有两个意义:
- 比起传统
I/O
有了结构化抽象化的意义,其抽象出了Channel
和Buffer
对象,这种思维是可以学习的,把重点放在了端,而传统的IO流
重点是数据流。 - 在网络的
Channel
中,可以使用非阻塞式的写法,在传统I/O
中,如果你要写一个百万级服务器,那么每一个客户端都需要新建一个线程,因为 传统I/O
只能是阻塞的,那么你需要开 上万个线程,这对操作系统来说压力是比较大的,如果你使用线程池的话,也是需要一个超时时间,一个线程同时只能处理一个 流。而使用 非阻塞NIO
的话,我们可以使用非阻塞式,然后使用一个Selector
对象来多路复用,一个线程可以管理多个Channel
。
但是这两点在其缺点看来依然是微不足道。因此 nio
虽然思想很好,但是真正使用的的人是微乎其微。