AQS - 条件队列.md

何言 2021年08月11日 85次浏览

JUC 系列之 AQS。本文主要介绍 AQS 中条件队列的实现。在此之前希望你能对 AQS 的独占锁部分进行相关了解。

条件队列

AQS 主要是对标 JDK 语法的 Synchronized ,而条件队列 (Condition) 则对标 JDK 语法中的 wait, notify 等方法。

来看看如下代码:

synchronized(obj){
    // 同步代码1
    obj.wait();
    // 同步代码2
}

当我们进行到 synchronized 时,会尝试获取 obj 锁,如果没获取到则一直阻塞(自旋或挂起),获得锁后将执行 同步代码1,当执行到 wait 时,线程会尝试将放弃锁,然后进入到 obj 的条件队列中,直到某个获取该锁的对象执行 notify 方法唤醒。

其中 notify 唤醒条件队列中队首线程,notifyAll 唤醒条件队列的所有线程。

这里是要区分条件队列和锁本身的阻塞队列,当调用 notifyAll 时,所有线程会从条件队列出队并全部唤醒,然后全部代码会尝试重新获取锁,之后的发展要看锁的类型而定。而 notify 只唤醒队首线程,然后其开始尝试重新获取锁,其他线程依然在条件队列中。

如果使用 AQS + Condition 的话,以上代码相当于:

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
  lock.lock();
  condition.await();
} catch (InterruptedException e) {
  e.printStackTrace();
} finally {
  lock.unlock();
}

这里使用了 Reentrant 作为锁,而新建了一个 Condition 对象,当调用 condition.await();时,线程会立即放弃当前锁并加入条件队列,等待唤醒。

可以看到这里方法名不太一样,这里给出对照:

JDKCondition
wait()await()
notify()signal()
notifyAll()signalAll()

这里需要注意一点,当你使用 JUC 中的锁时,是可以指定锁的类型,同个锁也可以实现多个条件队列。

而如果你使用 JDK 的语法的话,synchronized 为可重入非公平锁,而 wait 与 notify 只可以实现一个条件队列。

Condition

Condition 是一个接口,其方法如下:

image20210725160928893.png

这个接口主要是规定了条件的 await 和 signal 方法,其主要实现为 AQS 中的内部类 ConditionObject, 如下图:

image20210725161151869.png

因此我们主要来分析 ConditionObject 的源码

ConditonObject

首先是维护了一条链队列,其中头节点为 firstWaiter ,尾结点为 lastWaiter,注意,虽然这里使用的是与之前的等待队列一样的 Node ,但这里是单向链表,其指向下一个结点的域为 nextWaiter,注意,当一个结点在条件队列里时,其 nextWaiter 就不再是指其时共享锁还是独占锁,而是下一个结点。

接下来将分析比较重要的 await 与 signal 方法:

public final void await() throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    } else {
        AbstractQueuedSynchronizer.Node node = this.addConditionWaiter();
        int savedState = AbstractQueuedSynchronizer.this.fullyRelease(node);
        int interruptMode = 0;

        while(!AbstractQueuedSynchronizer.this.isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
                break;
            }
        }

        if (AbstractQueuedSynchronizer.this.acquireQueued(node, savedState) && interruptMode != -1) {
            interruptMode = 1;
        }

        if (node.nextWaiter != null) {
            this.unlinkCancelledWaiters();
        }

        if (interruptMode != 0) {
            this.reportInterruptAfterWait(interruptMode);
        }

    }
}

首先检查一下中断标记,然后调用 addConditionWaiter 方法添加一个结点,这里是直接 new 一个新的节点:

private AbstractQueuedSynchronizer.Node addConditionWaiter() {
    if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    } else {
        AbstractQueuedSynchronizer.Node t = this.lastWaiter;
        if (t != null && t.waitStatus != -2) {
            this.unlinkCancelledWaiters();
            t = this.lastWaiter;
        }

        AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(-2);
        if (t == null) {
            this.firstWaiter = node;
        } else {
            t.nextWaiter = node;
        }

        this.lastWaiter = node;
        return node;
    }
}

首先判断当前线程是不是持有锁并且正在运行,如果不是则抛异常,这里是保证必须要先持有某个 AQS 锁才能进行条件等待。

然后是添加新节点的操作,结点的 waitStatus 为 -2,为常量 CONDITION 的值。

首先需要去除一下不可用节点,也就是调用 unlinkCancelledWaiters 方法。:

private void unlinkCancelledWaiters() {
    AbstractQueuedSynchronizer.Node t = this.firstWaiter;

    AbstractQueuedSynchronizer.Node next;
    for(AbstractQueuedSynchronizer.Node trail = null; t != null; t = next) {
        next = t.nextWaiter;
        if (t.waitStatus != -2) {
            t.nextWaiter = null;
            if (trail == null) {
                this.firstWaiter = next;
            } else {
                trail.nextWaiter = next;
            }

            if (next == null) {
                this.lastWaiter = trail;
            }
        } else {
            trail = t;
        }
    }

}

然后在将其插入队列尾,这里队列是没有头节点的,所以需要进行 null 判断。

回到 await 。添加节点后,会调用 AQS 的 fullyRelease 方法释放当前线程占用的所有锁:

    final int fullyRelease(AbstractQueuedSynchronizer.Node node) {
        try {
            int savedState = this.getState();
            if (this.release(savedState)) {
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } catch (Throwable var3) {
            node.waitStatus = 1;
            throw var3;
        }
    }

首先调用 state 获取其 arg(AQS 提供的子类上锁的参数,对于可重入锁来说 state 为重入层数)。然后调用 release 方法 ,直接释放锁。

之后就是阻塞了,此时线程已经进入条件队列,同时也释放了锁,也唤醒了等待线程(唤醒后不一定能获得锁,在非公平锁的环境下,有可能此时刚来竞争的线程会获得锁),此时就需要等待条件释放了。

首先是一个自旋,条件为该节点不在等待队列里,然后调用 park 将其挂起,之后会检查中断位以进行相关操作。只要该节点不在等待队列中,就一直循环,因此我们可以猜测在 signal 操作中,我们除了要唤醒线程,还需要将其放入等待队列。

当其他线程调用 signal 将其放入等待队列并唤醒后,这里会跳出循环,之后需要重新获取锁,这里调用的是 acquireQueued 方法,其中第二个参数是 state,表示需要获取同样的 arg,进入 acquireQueued 方法后,该节点就和普通等待队列中的结点一致,一直在自旋等待与挂起自己等待释放。

后面的代码为去除无效结点与中断设置。至此 await 操作完成。

当然,除了 无限等待,条件队列还提供了超时等待 ( awaitNanos ) 与限期等待 (awaitUntil)的方法,如下:(以超时等待为例)

public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) {
        throw new InterruptedException();
    } else {
        long deadline = System.nanoTime() + nanosTimeout;
        AbstractQueuedSynchronizer.Node node = this.addConditionWaiter();
        int savedState = AbstractQueuedSynchronizer.this.fullyRelease(node);

        int interruptMode;
        for(interruptMode = 0; !AbstractQueuedSynchronizer.this.isOnSyncQueue(node); nanosTimeout = deadline - System.nanoTime()) {
            if (nanosTimeout <= 0L) {
                AbstractQueuedSynchronizer.this.transferAfterCancelledWait(node);
                break;
            }

            if (nanosTimeout > 1000L) {
                LockSupport.parkNanos(this, nanosTimeout);
            }

            if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
                break;
            }
        }

        if (AbstractQueuedSynchronizer.this.acquireQueued(node, savedState) && interruptMode != -1) {
            interruptMode = 1;
        }

        if (node.nextWaiter != null) {
            this.unlinkCancelledWaiters();
        }

        if (interruptMode != 0) {
            this.reportInterruptAfterWait(interruptMode);
        }

        long remaining = deadline - System.nanoTime();
        return remaining <= nanosTimeout ? remaining : -9223372036854775808L;
    }
}

这里的超时指的是等待 signal 超时,当超时后,会直接不等待条件 signal,直接开始重新竞争锁。

之后是 signal 方法:

public final void signal() {
    if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    } else {
        AbstractQueuedSynchronizer.Node first = this.firstWaiter;
        if (first != null) {
            this.doSignal(first);
        }

    }
}

public final void signalAll() {
    if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    } else {
        AbstractQueuedSynchronizer.Node first = this.firstWaiter;
        if (first != null) {
            this.doSignalAll(first);
        }

    }
}

可以看到,在检查合法性后,是来到 doSignal 方法与 doSignalAll 方法:

private void doSignal(AbstractQueuedSynchronizer.Node first) {
    do {
        if ((this.firstWaiter = first.nextWaiter) == null) {
            this.lastWaiter = null;
        }

        first.nextWaiter = null;
    } while(!AbstractQueuedSynchronizer.this.transferForSignal(first) && (first = this.firstWaiter) != null);

}

这里把条件队列第一个节点拿出来,然后调用 transferForSignal 方法,如果返回 false,则重复操作,直到队列为空或返回 true,因此来到 transferForSignal 方法:

final boolean transferForSignal(AbstractQueuedSynchronizer.Node node) {
    if (!node.compareAndSetWaitStatus(-2, 0)) {
        return false;
    } else {
        AbstractQueuedSynchronizer.Node p = this.enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, -1)) {
            LockSupport.unpark(node.thread);
        }

        return true;
    }
}

使用 cas 操作将 status 修改为 0 (默认状态,原本为 -2 条件状态)。

然后来到 enq 方法将条件队列第一个结点放入等待队列队尾:

private AbstractQueuedSynchronizer.Node enq(AbstractQueuedSynchronizer.Node node) {
    while(true) {
        AbstractQueuedSynchronizer.Node oldTail = this.tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (this.compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            this.initializeSyncQueue();
        }
    }
}

这里采用自旋 cas 方法保证成功

回到 transferForSignal,这里有一个 if,如果前一个结点的 status 大于零,将会使用 cas 操作将其修改为 -1,如果成功了,则之后该节点和唤醒时必定会唤醒后面结点因此这里没必要唤醒该线程,否则,将唤醒该线程。之后该线程会从 await 继续走,走到 acquireQueued,重新走竞争锁的逻辑。

doSignalAll 其实也比较简单,就是将之前的循环直到成功换成无论是否成功都继续:

private void doSignalAll(AbstractQueuedSynchronizer.Node first) {
    this.lastWaiter = this.firstWaiter = null;

    AbstractQueuedSynchronizer.Node next;
    do {
        next = first.nextWaiter;
        first.nextWaiter = null;
        AbstractQueuedSynchronizer.this.transferForSignal(first);
        first = next;
    } while(next != null);

}

Condition 与 AQS 对应

我们注意到,Condition 是 AQS 的内部类,并不是静态的,而 Condition 中调用的释放锁方法也是自己外部的方法。