AQS - 共享锁.md
JUC 系列之 AQS。
本文主要介绍 AQS 实现共享锁的方式与 Semaphore
锁获取
获取锁的话,主要有 两个方法:
public final void acquireShared(int arg) {
if (this.tryAcquireShared(arg) < 0) {
this.doAcquireShared(arg);
}
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
} else {
if (this.tryAcquireShared(arg) < 0) {
this.doAcquireSharedInterruptibly(arg);
}
}
}
可以看到,首先调用 tryAcquireShared
方法获取,这里和独占锁类似,AQS 中直接抛异常,需要由子类实现:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
需要注意的是这里的返回值不再是 boolean ,而是 int,对应三种情况:
负数:获取锁失败
0 : 获取锁成功,且为最后一个资格
正数:获取锁成功,并且资源还有剩余
之后会来到 doAcquireShared
或 doAcquireSharedInterruptibly
方法,这里只介绍前者。
private void doAcquireShared(int arg) {
AbstractQueuedSynchronizer.Node node = this.addWaiter(AbstractQueuedSynchronizer.Node.SHARED);
boolean interrupted = false;
try {
while(true) {
AbstractQueuedSynchronizer.Node p = node.predecessor();
if (p == this.head) {
int r = this.tryAcquireShared(arg);
if (r >= 0) {
this.setHeadAndPropagate(node, r);
p.next = null;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node)) {
interrupted |= this.parkAndCheckInterrupt();
}
}
} catch (Throwable var9) {
this.cancelAcquire(node);
throw var9;
} finally {
if (interrupted) {
selfInterrupt();
}
}
}
大体和独占锁差不多,只有两个区别,首先调用 addWaiter 时的标识是共享锁结点,如果该结点为第一个结点,则获取锁之后需要调用 setHeadAndPropagate
方法。因为共享锁当你获取锁如果还有资源剩余,则其他线程也可以获取剩下的锁,这里需要唤醒等待队列,这里来到 该方法:
private void setHeadAndPropagate(AbstractQueuedSynchronizer.Node node, int propagate) {
AbstractQueuedSynchronizer.Node h = this.head;
this.setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = this.head) == null || h.waitStatus < 0) {
AbstractQueuedSynchronizer.Node s = node.next;
if (s == null || s.isShared()) {
this.doReleaseShared();
}
}
}
这里主要是将当前结点设置为头节点(出队),然后唤醒之后的共享锁结点,这里的唤醒使用的是 doReleaseShared
方法,在之后锁释放会讲到。
锁释放
释放主要是 releaseShared
方法:
public final boolean releaseShared(int arg) {
if (this.tryReleaseShared(arg)) {
this.doReleaseShared();
return true;
} else {
return false;
}
}
首先调用 tryReleaseShared
方法,这里返回值是一个 布尔值,在 AQS 中也是直接抛出异常,需要由子类重写。
可以看到会来到 doReleaseShared
方法:
private void doReleaseShared() {
while(true) {
AbstractQueuedSynchronizer.Node h = this.head;
if (h != null && h != this.tail) {
int ws = h.waitStatus;
if (ws == -1) {
if (!h.compareAndSetWaitStatus(-1, 0)) {
continue;
}
this.unparkSuccessor(h);
} else if (ws == 0 && !h.compareAndSetWaitStatus(0, -3)) {
continue;
}
}
if (h == this.head) {
return;
}
}
}
可以看到这是一个自旋的过程,这个循环只有一个条件,那就是 h == this.head
,h 为每次循环开始时获取的当前头节点,当运行到最后如果 h != this.head
时,说明当前操作中有某个结点出队了,即有线程获取了锁,此时会再次自旋,直到运行一次后没有线程能获取锁才结束。这里是为了尽可能的唤醒后面的结点去抢锁,因为共享锁一次可以由多个线程持有,为了提高并发度。
来看内部,首先时第一个 if
保证队列不为空 (有头节点),然后根据头节点的 waitStatus
来进行操作,如果该节点的 waitStatus
为 -1 则为 Signal,则说明需要唤醒后面结点,此时先使用 CAS 操作将 -1 改为 0。
注意,刚刚提到 doReleaseShared 方法在 acquireShared 方法中也可能调用,因此这里需要用 CAS 操作保证只有一个线程可以唤醒队头结点。
之后有一个逻辑,需要判断 ws 为 0,并且 会尝试将其修改成 -3,这里可以简单理解为,有可能执行到这里之后,有新的节点加入并挂起,新的节点挂起时一定会将前一个结点设置的 waitStatus 设置为 -1,此时这个释放锁的逻辑如果还没走完,就需要唤醒下一个节点(防止明明资源还有但是恰好后一个线程运行到这里才加入结果需要等下次唤醒),因此这里尝试将其使用 CAS 从 0 变为 -3,如果成功,则说明没有上述问题,直接跳过,否则说明有结点此时加入并修改为了 -1,所以需要调用 continue
再走一次唤醒逻辑。这里在之后的状态轮转会具体说明。
WaitStatus 轮转
WaitStatus 有几种情况:
- 0 默认情况
- -1 Signal 表示后面有结点,需要进行唤醒
- -3 Propagate 强制传播模式 表示处于一种临界状态
- 1 Cancel 表示该节点已经取消
首先,一个节点初始状态其 ws 值都为 0,而有两个地方需要用到,一是锁申请判断是否需要挂起,二是锁释放,我们一个一个来:
回顾一下锁申请部分,其中有用到的地方在:
- setHeadAndPropagate
- shouldParkAfterFailedAcquire
来看看第一个:
private void setHeadAndPropagate(AbstractQueuedSynchronizer.Node node, int propagate) {
AbstractQueuedSynchronizer.Node h = this.head;
this.setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = this.head) == null || h.waitStatus < 0) {
AbstractQueuedSynchronizer.Node s = node.next;
if (s == null || s.isShared()) {
this.doReleaseShared();
}
}
}
首先是设置新的头,然后根据一些情况来进行释放锁的操作,其中表示如果 ws 小于 0,则为 Singnal 或者 Propagate,都需要执行一次释放锁的操作,其实是为了尽可能的多执行释放锁的操作以尽量避免明明还有资源但是阻塞队列因为没有唤醒一直在等待的情况。毕竟唤醒后线程会重新申请锁,如果没有则线程会再次挂起不会影响。
当 propagete >0 ,则无论如何都可以释放锁,因为还剩余资源,同时,当 head 或者 node 的 ws < 0 时,说明刚刚有结点加入并挂起(挂起前需要将前一个结点的 ws 设置为 -1)或者 刚刚有结点执行了释放操作,将 ws 设置为 -1,因为正常逻辑下此处的 ws 应该为 0 。
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node pred, AbstractQueuedSynchronizer.Node node) {
int ws = pred.waitStatus;
if (ws == -1) {
return true;
} else {
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while(pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, -1);
}
return false;
}
}
这里是判断是否需要将结点挂起的逻辑,注意几点,如果 ws 为 -3,则会将其修改为 -1 然后返回 false,这里返回 false 之后会回到 acquireQueued 自旋再次获取锁 。
接下来来到锁释放部分,其实锁释放部分只有一处地方,就是上面的 doReleaseShared
private void doReleaseShared() {
while(true) {
AbstractQueuedSynchronizer.Node h = this.head;
if (h != null && h != this.tail) {
int ws = h.waitStatus;
if (ws == -1) {
if (!h.compareAndSetWaitStatus(-1, 0)) {
continue;
}
this.unparkSuccessor(h);
} else if (ws == 0 && !h.compareAndSetWaitStatus(0, -3)) {
continue;
}
}
if (h == this.head) {
return;
}
}
}
主要有以下规则:
- 唤醒后面结点当且仅当 CAS 将前一个结点的 ws 由 -1 变为 0 成功,这里保证了多个线程同时释放的时候只有一个线程可以释放成功
- 如果头节点 ws 为 0 则一定会将其修改为 -3 以防止有结点此时加入,且该节点刚好为头节点后一个结点并且判定当时没有资源然后将该节点设置为 -1 后准备挂起,此时释放方法执行一半,实际上可能是有资源可以申请的,此时需要将 ws 设置为 -3 让新加入的结点修改为 -1 后再次尝试获取锁。防止出现明明还有资源等待队列却全部挂起的情况。
- 死循环只有一个出口条件,就是
h == this.head
,表示经过一次循环后没有上述需要跳过的情况也没有新线程释放资源(头节点改变一定是某个线程获得了锁)
总结:
- 唤醒后面结点当且仅当 CAS 将前一个结点的 ws 由 -1 变为 0 成功
- 释放锁时如果头节点 ws 为 0 则需要保证将其 CAS 设置为 -3 以防止此时的结点添加判定问题
- 上锁判断挂起时如果前一个结点的 ws 为 -3 或 0 则需要 CAS 将其设置为 -1 并再次尝试获取锁
Semaphore
Semaphore (信号量) 是可重入的共享锁,其主要由 AQS 实现共享锁等待队列。
先看两个构造方法:
public Semaphore(int permits) {
this.sync = new Semaphore.NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
this.sync = (Semaphore.Sync)(fair ? new Semaphore.FairSync(permits) : new Semaphore.NonfairSync(permits));
}
可以看到,根据第二个参数来决定构造公平或不公平的 Sync 对象,第一个参数为最大并发量,这里直接传入 Sync 的构造方法。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
this.setState(permits);
}
final int getPermits() {
return this.getState();
}
final int nonfairTryAcquireShared(int acquires) {
int available;
int remaining;
do {
available = this.getState();
remaining = available - acquires;
} while(remaining >= 0 && !this.compareAndSetState(available, remaining));
return remaining;
}
protected final boolean tryReleaseShared(int releases) {
int current;
int next;
do {
current = this.getState();
next = current + releases;
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
} while(!this.compareAndSetState(current, next));
return true;
}
final void reducePermits(int reductions) {
int current;
int next;
do {
current = this.getState();
next = current - reductions;
if (next > current) {
throw new Error("Permit count underflow");
}
} while(!this.compareAndSetState(current, next));
}
final int drainPermits() {
int current;
do {
current = this.getState();
} while(current != 0 && !this.compareAndSetState(current, 0));
return current;
}
}
Sync 就是主要的 AQS 实现类,可以看到它实现了 nonfairTryAcquireShared
与 tryReleaseShared
方法,这里的非公平锁申请与独占锁类似,主要是因为哪怕是公平锁也需要提供马上竞争锁的方法,因此将非公平尝试获取锁的方法放到父类。
可以看到 nonfairTryAcquireShared
与 tryReleaseShared
采用自旋 CAS 的方式尝试判断 status 是否符合要求同时添加或减少。
此外,还提供了 自旋 CAS 操作的 reducePermits
与 drainPermis
进行资源的控制。
来看看公平锁和非公平锁 AQS 的具体实现:
static final class NonfairSync extends Semaphore.Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return this.nonfairTryAcquireShared(acquires);
}
}
非公平锁还是比较简单的,直接调用父类的非公平获取锁的方法。
static final class FairSync extends Semaphore.Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
int available;
int remaining;
do {
if (this.hasQueuedPredecessors()) {
return -1;
}
available = this.getState();
remaining = available - acquires;
} while(remaining >= 0 && !this.compareAndSetState(available, remaining));
return remaining;
}
}
公平锁主要是加了一层判断,如果等待队列中有线程正在等待,则直接返回 -1 。
来看看 信号量的 P 操作:
首先是 acquire 方法:
public void acquire(int permits) throws InterruptedException {
if (permits < 0) {
throw new IllegalArgumentException();
} else {
this.sync.acquireSharedInterruptibly(permits);
}
}
public void acquireUninterruptibly(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
} else {
this.sync.acquireShared(permits);
}
}
这里提供了是否可中断的两种方法。参数为要申请的资源数量,可以看到这里是直接调用了 sync 类也就是 AQS 中的方法。
以及带参数的尝试获取方法 (tryAcquire)
public boolean tryAcquire(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
} else {
return this.sync.nonfairTryAcquireShared(permits) >= 0;
}
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
if (permits < 0) {
throw new IllegalArgumentException();
} else {
return this.sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
}
接下来是释放 V 操作:
public void release(int permits) {
if (permits < 0) {
throw new IllegalArgumentException();
} else {
this.sync.releaseShared(permits);
}
}
public void release() {
this.sync.releaseShared(1);
}
可以看到,Semaphore
并没有对重入层数进行判断,因此 Semaphore
是不可重入的,或者说所有线程共享一个重入数量限制,当重入时,实际上是再重复申请了资源。