PipedOutputStream 和 PipedInputStream 源码分析

618

之前简单介绍了一下 Java 中的 IO 流 机制,和几种 IO 流 的特性和使用场景,在分析的时候看到一些 IO 流 的代码写的挺不错的,来简单分析几个的源码把,今天先看看 PipedOutputStreamPipedInputStream

引子

这两个 IO 流,的主要特点是支持相互连接,其他 IO 流基本上读和写都是一边的,另一边是一个固定的地方。而如果你把 PipedInputrStreamPipedOutputStream 连接之后,在 PipedOutputStream 写入的东西,可以在另一边的 PipedInputStream 读取到。在使用的时候也可以连接后使用多态,以 InputStreamOutputStream 的形式返回

PipedOutputStream

先从输出流讲起把。输出流比较简单,来看看 Structure

image.png

构造方法

这个构造方法比较简单,只有两个,其中无参构造方法为空,另一个构造方法会调用 connect 方法:

public PipedOutputStream() {
}

public PipedOutputStream(PipedInputStream snk)  throws IOException {
    connect(snk);
}

connect

来看看 connect 方法:

public synchronized void connect(PipedInputStream snk) throws IOException {
    if (snk == null) {
        throw new NullPointerException();
    } else if (sink != null || snk.connected) {
        throw new IOException("Already connected");
    }
    sink = snk;
    snk.in = -1;
    snk.out = 0;
    snk.connected = true;
}

还是比较简单的,首先是空安全,然后是确保只能一对一连接,然后把成员变量设置为连接的对象,把对象标志置为初始值,把对象连接标记置为 true;

为了方便,之后的描述中,sink 表示 PipedInputStreamsonk 表示 PipedOutputStream

flush 和 close

public synchronized void flush() throws IOException {
    if (sink != null) {
        synchronized (sink) {
            sink.notifyAll();
        }
    }
}

public void close()  throws IOException {
    if (sink != null) {
        sink.receivedLast();
    }
}

flush 比较简单,首先是自己的同步锁还有连接的 sink 的同步锁,然后唤醒所有 sink 中等待的线程,每次我们写入之前都要先调用 flush 否则很有可能 sink 会延迟读到消息,这里有 notifyAll 说明在某个地方调用了 wait 方法,再等等的 sink 中就会看见。

close 也是直接调用 sinkreceivedLast 方法。

write

说到输出流,肯定需要看 write 方法啦,基类里有两个个重载,具体如下:

public void write(int b)  throws IOException {
    if (sink == null) {
        throw new IOException("Pipe not connected");
    }
    sink.receive(b);
}
public void write(byte b[], int off, int len) throws IOException {
    if (sink == null) {
        throw new IOException("Pipe not connected");
    } else if (b == null) {
        throw new NullPointerException();
    } else if ((off < 0) || (off > b.length) || (len < 0) ||
                ((off + len) > b.length) || ((off + len) < 0)) {
        throw new IndexOutOfBoundsException();
    } else if (len == 0) {
        return;
    }
    sink.receive(b, off, len);
}

可以看到,在判断参数合法之后,调用的是 sinkreceive 方法,等等我们可以具体看看这个方法。

至此,PipedOutputStream 的源码就这么多,我们可以看到,其内部其实比较简单,主要是持有一个连接的 PipedInputStream 对象 sink,然后写入,刷新和关闭等方法都是调用 sink 的对应方法实现,所以 PipedOutputStream 主要是维持连接还有写入的功能,这个流的主要数据实现是在 PipedInputStream 中。

PipedInputStream

先看 Structure,发现比 PipedOutputStream 复杂得多:

image.png

构造方法

有四个构造方法,如下:

public PipedInputStream(PipedOutputStream src) throws IOException {
    this(src, DEFAULT_PIPE_SIZE);
}

public PipedInputStream(PipedOutputStream src, int pipeSize)
            throws IOException {
    initPipe(pipeSize);
    connect(src);
}

public PipedInputStream() {
    initPipe(DEFAULT_PIPE_SIZE);
}
public PipedInputStream(int pipeSize) {
    initPipe(pipeSize);
}

可以看到最终都会来到 initPipeconnect 方法。

initPiped

private void initPipe(int pipeSize) {
        if (pipeSize <= 0) {
        throw new IllegalArgumentException("Pipe Size <= 0");
        }
        buffer = new byte[pipeSize];
}

这个方法主要是开辟缓存空间,其中 buffer 是成员变量。

connect

public void connect(PipedOutputStream src) throws IOException {
    src.connect(this);
}

可以看到是调用 PipedOutputStreamconnect 方法,说明 PipedOutputStream 并不持有连接的对象,连接是交给对方维护的,本身只维护一个是否连接的布尔值。

成员变量

先来介绍一下几个成员变量:

boolean closedByWriter; // 写端关闭
volatile boolean closedByReader; // 读端关闭

boolean connected; // 是否连接

Thread readSide; // 写端的线程
Thread writeSide; // 读端的线程

protected byte buffer[]; // 缓存空间

protected int in = -1; // 写入端指针
protected int out = 0; // 读取端指针

其实内部的缓存空间相当于循环数组实现的队列,写入端不断写入数据,而读取端不断读取,可以参考 gif

![GIF 2021-4-3 16-16-27.gif][3]

这个模型还是不难理解的,难点在于线程同步问题。

不过注意,在实际代码中, inout 的指针并不是和图里一致,如果和图里一致,会出现无法判断是空的还是满的问题,在代码中,如果缓冲区为空,则 in-1,如果 in == out ,则表示缓冲区已满。

receive

这里的 receive 有两个重载,我们看只收到一个字节的方法分析比较简单,另一个会给出,可自行分析:

protected synchronized void receive(int b) throws IOException {
    checkStateForReceive();
    writeSide = Thread.currentThread();
    if (in == out)
        awaitSpace();
    if (in < 0) {
        in = 0;
        out = 0;
    }
    buffer[in++] = (byte)(b & 0xFF);
    if (in >= buffer.length) {
        in = 0;
    }
}

首先调用 checkStateForReceive 确保状态正常,然后将本线程赋值给 writeSide ,注意这里是写线程。然后如果 in==out ,则表示当前缓冲区已满,此时调用 awaitSpace 等待空间,具体 checkStateForReceiveawaitSpace 后面会给出,然后如果 in<0 ,一般为 -1 ,表示当前缓冲区为空,这里先将两个标志都置为 0,因为是循环队列,当不断读写的时候,缓冲区为空的时候,out 可能并不为 0

然后将数 b 放入缓冲区,注意这里缓冲区是 byte ,是一个字节,因此先和 0xFF 做一个与操作保留最低八位然后强转成功 byte

如果写入后 in >= buffer.length 表示写到了结尾,这里则回到 0 ,下一次从 0 写入,也就是循环队列。

关于另一个重载,其实思路类似,只不过多了一些空间和下标的判断:

synchronized void receive(byte b[], int off, int len)  throws IOException {
    checkStateForReceive();
    writeSide = Thread.currentThread();
    int bytesToTransfer = len;
    while (bytesToTransfer > 0) {
        if (in == out)
            awaitSpace();
        int nextTransferAmount = 0;
        if (out < in) {
            nextTransferAmount = buffer.length - in;
        } else if (in < out) {
            if (in == -1) {
                in = out = 0;
                nextTransferAmount = buffer.length - in;
            } else {
                nextTransferAmount = out - in;
            }
        }
        if (nextTransferAmount > bytesToTransfer)
            nextTransferAmount = bytesToTransfer;
        assert(nextTransferAmount > 0);
        System.arraycopy(b, off, buffer, in, nextTransferAmount);
        bytesToTransfer -= nextTransferAmount;
        off += nextTransferAmount;
        in += nextTransferAmount;
        if (in >= buffer.length) {
            in = 0;
        }
    }
}

read

作为 OutputStream ,还是要看 read 方法,这里也是有两个重载,我们看读一个字节的来分析,比较简单:

public synchronized int read()  throws IOException {
    if (!connected) {
        throw new IOException("Pipe not connected");
    } else if (closedByReader) {
        throw new IOException("Pipe closed");
    } else if (writeSide != null && !writeSide.isAlive()
                && !closedByWriter && (in < 0)) {
        throw new IOException("Write end dead");
    }

    readSide = Thread.currentThread();
    int trials = 2;
    while (in < 0) {
        if (closedByWriter) {
            /* closed by writer, return EOF */
            return -1;
        }
        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
            throw new IOException("Pipe broken");
        }
        /* might be a writer waiting */
        notifyAll();
        try {
            wait(1000);
        } catch (InterruptedException ex) {
            throw new java.io.InterruptedIOException();
        }
    }
    int ret = buffer[out++] & 0xFF;
    if (out >= buffer.length) {
        out = 0;
    }
    if (in == out) {
        /* now empty */
        in = -1;
    }

    return ret;
}

开头三个 if 是合法性判断,然后将 当前线程赋值给 readSize ,这里是读线程

然后是一个循环,如果 in<0 ,注意看我们最后面,如果所有数据都被读完了则把 in=-1,因此这个循环表示只要缓冲区无数据就会一直阻塞,然后循环里如果被关闭了则返回 -1,第二个则是判断如果写端存在并且写端已经死了,则将标志位 -1,如果标志位为 0 则抛异常。然后就 notifyAll 唤醒所有同步锁中的线程,然后等待 1s,这里同步锁中的线程可能是读线程也可能是写线程这里的线程模型等等会再次说明,这里先直接这样看过,然后就是获取缓冲区里 out 下标的数据然后返回。

第二个 read 如下,可以自己分析一下,也是类似,只不过加入了数据和下标的处理:

public synchronized int read(byte b[], int off, int len)  throws IOException {
    if (b == null) {
        throw new NullPointerException();
    } else if (off < 0 || len < 0 || len > b.length - off) {
        throw new IndexOutOfBoundsException();
    } else if (len == 0) {
        return 0;
    }

    /* possibly wait on the first character */
    int c = read();
    if (c < 0) {
        return -1;
    }
    b[off] = (byte) c;
    int rlen = 1;
    while ((in >= 0) && (len > 1)) {

        int available;

        if (in > out) {
            available = Math.min((buffer.length - out), (in - out));
        } else {
            available = buffer.length - out;
        }

        // A byte is read beforehand outside the loop
        if (available > (len - 1)) {
            available = len - 1;
        }
        System.arraycopy(buffer, out, b, off + rlen, available);
        out += available;
        rlen += available;
        len -= available;

        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }
    }
    return rlen;
}

checkStateForReceive 和 awaitSpace

private void checkStateForReceive() throws IOException {
    if (!connected) {
        throw new IOException("Pipe not connected");
    } else if (closedByWriter || closedByReader) {
        throw new IOException("Pipe closed");
    } else if (readSide != null && !readSide.isAlive()) {
        throw new IOException("Read end dead");
    }
}

private void awaitSpace() throws IOException {
    while (in == out) {
        checkStateForReceive();

        /* full: kick any waiting readers */
        notifyAll();
        try {
            wait(1000);
        } catch (InterruptedException ex) {
            throw new java.io.InterruptedIOException();
        }
    }
}

checkStateForReceive 主要是检查当 reveive 调用的时候是否正常,异常清空包括未连接,流已经被关闭还有读方线程已经死亡等清空。

awaitSpace 则是等待缓冲区的空间,因为缓冲区是循环数组,有可能已经满了,如果 in==out 则表示缓冲区已满,注意,这个方法是在 receve 方法里调用的,此时是写线程调用。然后调用 notifyAll 来唤醒所有锁中等待的线程,然后自己等待一秒,直到有空间为止。

receivedLast 和 available

synchronized void receivedLast() {
    closedByWriter = true;
    notifyAll();
}

public synchronized int available() throws IOException {
    if(in < 0)
        return 0;
    else if(in == out)
        return buffer.length;
    else if (in > out)
        return in - out;
    else
        return in + buffer.length - out;
}

这两个比较简单, receivedLast 是由 outputStreamclose 方法调用,直接将一个标记置为 true ,然后唤醒所有等待的线程

available 则是通过下标计算当前缓冲区中的数据长度。

源码分析完了,这里面主要是有一个

线程模型

最左边是 PipedOutputStream 对象,上了同步锁。

具体操作的效果如下:

  • 当调用 notifyAll() 方法时,将所有等待区的线程放入竞争区
  • 当调用 wait(1000) 方法时,将自己放入等待区,如果 1000 毫秒后还在等待区,则加入竞争区
  • 当左边的对象空闲的时候,从竞争区随机一个线程可以进入重新上锁

而对于读线程和写线程,刚刚的源码分析可以发现:

  • 当读线程获取对象后发现没有数据可读,则调用一次 notifyAll,然后调用一次 wait(1000)
  • 当写线程获取对象后发现没有数据可写,则调用一次 notifyAll, 然后调用一次 wait(1000)