概述
标题为初识,其实一两年前就有听课程讲过AQS,实在觉得一是只要理解大概流程就好,二是实在是用不到这个,因此导致其实一直处于 “初识” 的阶段,每当忘记之后又要重新阅读文章,于是有了这篇主动去记录的博文。
最后,在源码分析的环节因为才疏学浅,又因网上文章良莠不齐,难免导致对某些代码进行主观臆测,对于存在问题部分还请评论告知。
整体认识
首先AQS全名为 AbstractQueuedSynchronizer,其本质为一个双向的先进先出队列,它的主要用处是对JUC中的各个同步类库提供一个统一的阻塞管理。 我没有查阅资料,私以为必然是先有的各类同步器的代码(概念),再有的抽象出来的AQS,它是同步器的一个基础设施,另外它只不过是一个队列而已,没有多么复杂。
其中AQS主要作用体现如下:
- 当锁申请互斥的时候由AQS“自动”判定对应请求是否应该进入阻塞等待
- 当锁申请释放的时候由AQS“自动”判定唤醒哪个被阻塞的任务
- 在锁被锁定的时候提供额外的条件阻塞队列,细分互斥条件
而前两个作用主要围绕着的就是AQS类中的变量state。比如拿ReentrantLock为例,其将state变量定义为:如果为0代表当前锁并没有被获得,即当前可以试图获取这把锁,而如果非零,则代表某个线程的已重入次数。
/**
* The synchronization state.
*/
private volatile int state;
释放锁与获取锁
试图获取锁的方式主要是AQS类中的:
- public final void acquire(int arg)
- public final void acquireInterruptibly(int arg)
- public final void acquireShared(int arg)
- public final void acquireSharedInterruptibly(int arg)
其中第一个acquire代表获取互斥锁的方法,第二个是其对应的可被打断的版本,而第三个则是获取共享锁的方法,具体是否能持有锁的细节需要同步器的开发人员自己进行重写这些 “acquire” 方法对应的 “tryAcquire” 方法。
而对于互斥锁与共享锁的区别,如果有用过读写锁的话就可以很容易的理解,即互斥锁与互斥锁、共享锁互斥,共享锁与互斥锁互斥而和共享锁可同时存在。
应用到队列里就是,如果当前锁被持有的是互斥锁,那么队列第一个线程无论是互斥锁还是共享锁都需要阻塞等待。而如果被持有共享锁的线程获取且第一个线程(以及其后的共享锁)是共享锁,那么都可以共同持有。
对于释放锁与获取锁大概风格相同,也就是release、releaseShared,以及需要重写的tryRelease和tryReleaseShared,因此暂且不表。
条件阻塞队列
而对于条件阻塞队列主要是在AQS内部设置了一个名为 ConditionObject 的内部类,通过类变量可知,该类是一个链表,但其节点复用了AQS的,不过内部逻辑是单向的。可以想象被条件变量阻塞(await方法)似乎就是把AQS中未完成条件的节点转移到条件阻塞队列中?当条件阻塞队列的条件被满足的时候(其他线程执行signal或singnalAll方法)再从条件阻塞队列转移到AQS中。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
源码分析
Node 节点结构与状态
除却一个链表节点必须要有的前驱与后继,node节点还有:
- waitStatus,当前节点的状态
- nextWaiter,当前节点后续的等待节点
- thread,当前节点对应的线程
其后node节点的状态主要如下:
- CANCELLED 1, 被取消的节点,在源码中遇到 waitStatus > 0 的条件时都是指被取消的节点,一般需要跳过,继续向前或后遍历
- SIGNAL -1,如果当前节点状态为这个,则代表释放锁后要唤醒后继节点
- CONDITION -2,等待 condition 条件成立的状态
- PROPAGATE -3,工作在共享锁状态,如果是该状态则根据资源的是否剩余来唤醒后继节点
通过 acquire() 获取锁之流程
主要方法有:
- tryAcquire(arg), 尝试获取锁
- addWaiter(Node.EXCLUSIVE),如果获取锁失败了就增加一个类型为互斥锁的节点
- acquireQueued(新增的节点, arg)),判定是否应该阻塞和进行阻塞两种操作,其返回值的语义是在阻塞期间是否有接受过中断
- selfInterrupt(),如果acquireQueued返回true,则打断自己
其中需要注意的事 arg 参数代表要设置的 state 值,如果是单一的互斥锁一般只有0和1,但是会有变化,比如ReentrantLock就有>1的时候,即已重入次数。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
如前面所述,tryAcquire是同步器自行编写的,因此这里把 ReentrantLock 中非公平锁的tryAcquire 方法拿过来。其核心主要还是通过CAS操作将锁的状态由0转为1视为成功获取锁。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 基于CAS操作将状态设为1
if (compareAndSetState(0, acquires)) {
// 声明当前互斥锁已被当前线程持有
setExclusiveOwnerThread(current);
return true;
}
}
// 如果互斥锁的状态不为0,则再去判断持有锁的线程是不是自己,如果是则认为是“重入”
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
addWaiter
addWaiter 方法的主要用处是去创建节点以及将节点加入到队列之中。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 如果当前队列末尾不为空,则通过CAS的方式将当前节点加到末尾的后面,即新的末尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果tail为空,代表当前队列里是空的,当前线程是第一个试图进入阻塞队列的节点
// 因此需要通过enq去初始化队列
enq(node);
return node;
}
其后进入 enq 初始化队列以及将node加入队列。
private Node enq(final Node node) {
// 自旋到成功的将node节点设置为tail
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 如果tail是空的,则代表需要先添加一个虚拟节点
// 之所以需要虚拟节点是因为这样可以方便操作后续节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued
这个方法本质上讲是AQS在获取锁的时候最重要的一个方法,也是看着比较奇怪的方法,因为作者以自旋的策略去重复的绕来绕去以等待正确的“状态”然后去处理。
私以为刚看这个方法先将与 interrupt 有关的字眼全都屏蔽到,先搞清楚这个方法的主题逻辑为上。
首先,既然执行到了 acquireQueued 方法则表明当前的锁已经被持有了,所以理应被阻塞。假设当前节点的前驱节点为新创建的哨兵节点,那么根据逻辑,第一次自旋中的tryAcquire 返回必然是false,而其后也理应阻塞,暂且不表 shouldParkAfterFailedAcquire 中的逻辑,那么当 parkAndCheckInterrupt 方法中进行阻塞操作后醒来,会设置是否中断的标志位,其后再次进行自旋。
而什么时候会醒来呢?理应是它的前驱结点释放锁后主动的将这个线程取消阻塞,因此新一轮的自旋显然就可以通过 tryAcquire 方法获得锁。
其后返回是否被中断过的标志位。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 如果前驱结点是首节点,那么就尝试获取锁,行下克上取而代之之事。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果下克上失败,则判断应不应该把自己阻塞,从而装死
if (shouldParkAfterFailedAcquire(p, node) &&
// 阻塞自己并返回在阻塞期间是否曾被打断过
parkAndCheckInterrupt())
// 如果被打断过,则标记该变量
interrupted = true;
}
} finally {
// 如果在处理逻辑的过程中被中断或者等待超时,则需要将当前节点取消
// 因为与主逻辑无关,暂且不表
if (failed)
cancelAcquire(node);
}
}
- 下面是判断当前线程是否应该阻塞的逻辑,比较绕的一点是如果返回false后会再次执行前面的acquireQueued方法里的自旋再进行一次逻辑的重复。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点的状态
int ws = pred.waitStatus;
// 如果前驱节点状态是SIGNAL,则应该阻塞当前队列
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果前驱节点状态 > 0,即处于 CONCELED 状态,
//则应该将现在节点的前驱节点去除到一个正常节点之后,即删掉这些被取消的节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前驱结点状态是初始化的0(哨兵节点),或者是 PROPAGATE 状态
// 则应该将前驱节点的状态改为 SIGNAL,即声明:需要在其释放锁后唤醒当前节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 阻塞当前线程
LockSupport.park(this);
// 返回在被阻塞期间是否中断
return Thread.interrupted();
}
通过 acquireShared() 获取锁之流程
实际上共享锁的整体逻辑与互斥锁大致相同,只不过判定是否进入阻塞队列的逻辑在于当前持有的锁是互斥锁还是共享锁。
其中 tryAcquireShared 对应的逻辑大致为返回剩余的资源,或者还可以获得的锁的数量。
public final void acquireShared(int arg) {
// 需要对应的同步器自行编写逻辑
// 如果小于0,则代表需要加入到阻塞队列
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared
共享锁在成功获取资源后,与互斥节点的区别在于互斥锁直接返回,而共享锁还有一个传播的概念,即后继节点如果也是想要获取共享锁那么就将其唤醒,即“setHeadAndPropagate”方法中的“Propagate”,但此时还未进入释放锁的阶段,暂且不表。
private void doAcquireShared(int arg) {
// 添加一个类型为共享锁的节点到阻塞队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 返回还能获取的锁的数量,只要 >= 0 就代表仍然可以获取
int r = tryAcquireShared(arg);
if (r >= 0) {
// 更新队列头并唤醒后续的共享锁节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 是否阻塞当前节点,与互斥锁的流程相同,所以下面不再表述
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通过 release() 释放锁之流程
与 acquire() 相同,tryRelease() 也是需要同步器自行实现,后续依然采用 ReentrantLock 中的设计进行叙述。
而 release 方法实际是很简单,一个是使用 tryRelease 方法把状态值改为0(以ReentrantLock 为例),其后使用 unparkSuccessor 方法去唤醒后继节点。
无非就是有个判断,如果头节点(哨兵节点)不为空,且头节点有被操作过,即 waitSatus != 0,那么就代表头结点初始化后,存在后继节点。 是否还记得使用 shouldParkAfterFailedAcquire 方法判定是否阻塞当前节点的时候要把前驱节点的状态从初始的0改为 -1(SIGNAL) ?
如果当前头节点是空的,或者头节点是0,就是代表没有后继节点。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
如前面所述,tryRelease是同步器自行编写的,因此这里把 ReentrantLock 中的 tryRelease 方法拿过来。
值得注意的是 该方法有的时候由互斥锁和共享锁共同使用,因此判定是否已释放锁的条件为 getState() - releases,其中参数 releases 代表本次释放的次数(资源数)。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果未持有锁,报错
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// state = 0 的时候代表已释放
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
其后是 unparkSuccessor ,这个释放锁的流程中比较重要的方法。其中主要的动作由两部分:
其一,清除当前头节点的状态,由“需要唤醒后续节点(ws < 0)”切换为初始状态的0值。这一段的代码语义很是清晰,但我查阅很多篇文章都没有去讲过为什么需要清除头节点的状态,只能知道在某个同步器中的后继节点会依赖于当前节点的 waitStatus。
其二,选择后继节点进行唤醒,但是后继节点可能为空抑或是被取消。这里主要令人疑惑的点在于为什么会选择从尾部开始遍历? 原因在于,如果从首部开始遍历,因为等待的时间比较长所以被取消的概率远远大于尾部的节点,因此从尾部遍历可以更快的找到一个可用的后继节点。
private void unparkSuccessor(Node node) {
// 如果 ws < 0,则代表node状态是 SIGNAL 或 PROPAGATE,
// 这两种状态的含义都是需要唤醒后需的节点,而在此处是为了清除这个状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 首先认为后继节点是要唤醒的节点
Node s = node.next;
// 但是后继节点可能是空的,也可能是被取消的,所以需要遍历队列
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾部开始往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 将 s 对应的节点唤醒
LockSupport.unpark(s.thread);
}
通过 releaseShared() 释放锁之流程
对于 releaseShared 方法,主要方法有:
- tryReleaseShared(arg),尝试释放锁
- doReleaseShared(),如果释放锁成功,则调用该方法去唤醒阻塞队列里的其他节点
其中 tryReleaseShared 在 AQS 中并没有实现,所以后需叙述的代码为 ReentrantReadWriteLock 当中的实现。
其次,tryReleaseShared 方法如果确实释放了当前线程持有的读锁,但并不意味着返回 true,因为只有全部共享锁都已经释放之后才需要去调用 doReleaseShared 去唤醒其他节点。
最后需要说明的是,根据互斥锁和共享锁的定义,此时 doReleaseShared 中将要唤醒的节点一定是互斥锁节点。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
如前所述,此处的 tryReleaseShared 方法来源于 ReentrantReadWriteLock,也因此该方法只强调整体逻辑而较少的去探究各个步骤的实现。
其中代码分为两个步骤,其一为将计数器减去本次释放的锁的重入数,其二为使用 CAS 操作更新最新的 state 值,最后如果共享锁被锁定的次数为0,则返回 true。
值得注意的是,读写锁对于首个获取共享锁的线程做了专门的优化,使用了 firstReader 记录该线程,且单独使用 firstReaderHoldCount 去记录该线程的重入次数。而其他线程对应的重入数记录在 ThreadLocal 之中,通过 readHolds.get() 获取。
网上很多说之所以使用 firstReader 是因为可以借此省去计数器(重入数)链表查询对应线程重入数的遍历消耗,但这是错的,因为像前面所说,对于重入数明明是记录在 ThreadLocal 之中,哪来的链表呢?
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 针对第一个读线程做单独优化
if (firstReader == current) {
// 如果第一个读线程的重入次数 = 1,则清空该线程
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 接下来是从 threadLocal 中去获取当前线程对应的重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 自旋修改 state
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
doReleaseShared
首先这个方法的主要作用是去唤醒后续等待的互斥锁节点,一共有两个分支,虽然代码很简单,但其实逻辑很绕,需要拆开分别进行叙述,不过无奈花费三四个小时仍然不能很完善的将这个方法的方方面面想清楚,只得可能稍是简陋甚至有些问题的描述。
从整体来讲,如果头结点不为空且头结点不为尾,那么代表头结点的后面是存在其他节点的,因此需要去做唤醒操作。那么 第一个分支就是如果 ws = SIGNAL 时通过 unparkSuccessor(h) 去唤醒后继节点,这个方法前面有讲。 通过前面讲过的 shouldParkAfterFailedAcquire 方法可以知道如果存在后继节点,那么现在的节点的状态会被改为 SIGNAL。
而第二个分支就是,如果 ws = 0 且执行CAS操作将 0 改为 PROPAGATE 失败了。值得注意的是,当这个条件成立的时候,代表本次自旋所处的环境是一种“中间态”。
这时虽然后续有节点入队了,但是当修改头结点为 SIGNAL 的时候,doReleaseShared 方法已经进入第二个分支所在地方了,因此开始新一轮重新去判定。
当新一轮对应的线程分为两种可能:
- 其一是成功获取到锁,那么自旋后会以新获取锁的节点进行一次自旋,但结果是没有任何影响,即使又进入一次唤醒后继节点也无所谓。
- 其二是又被其他节点抢走并替换,那么这个节点就阻塞等待被唤醒。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 清除头结点 SIGNAL 标记
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
// 第二个分支,
// 如果 waitStatus == 0, 且不能将状态设置为 PPROPAGATE 则重试
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果此时 head 没有发生改变,代表唤醒操作完成
if (h == head) // loop if head changed
break;
}
}
条件阻塞队列的实现
通过 ReentrantLock 中的 newCondition 方法可知,所谓条件阻塞队列其实就是创建了一个名为 ConditionObject 的对象。
final ConditionObject newCondition() {
return new ConditionObject();
}
而这个对象根据内部结构可以看出是一个链表。
public class ConditionObject implements Condition {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
首先它的主要用处在于在拿到 AQS 锁的情况下,再次细分出一种“条件”的概念,如果条件不满足则释放该锁,反之放入条件队列中等待被其他线程唤醒(即条件达成的时候)。
其主要方法如下:
- await(),等待条件达成
- signal(),唤醒处于等待状态的首个线程,即条件已达成
- signalAll(),唤醒所有等待状态的线程
与此同时比较特殊的还有两种在等待期间的两种中断模式,分别如下:
- THROW_IE,表示中断且需要抛出 InterruptedException
- REINTERRUPT,表示中断但不需要抛出异常
await() 之流程
执行 await 流程中对于基本的等待逻辑其实不难,主要绕的是对于中断状态的处理。
首先,整体大概逻辑如下:
- 执行 addConditionWaiter() 将当前线程加入到条件阻塞队列里,简单的链表操作
- 执行 fullyRelease() 释放线程所持有的锁(资源),其实就是调用 release() 方法
- 阻塞,记录条件队列阻塞期间的异常
- 被唤醒后执行 acquireQueued 方法在队中阻塞到成功持有锁,如果在阻塞过程中收到异常,且在条件队列阻塞期间的异常不为 THROW_IE(要求抛出异常的中断),视为 REINTERRUPT(不抛出异常)
- 如果条件队列后面还有节点,调用 unlinkCancelledWaiters() 清理已经达成条件的节点
- 处理异常,如果是 THROW_IE 就抛出 InterruptedException,反之 REINTERRUPT 的话执行 selfInterrupt() 自行了结而不宣而告之。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程添加到条件阻塞队列中
Node node = addConditionWaiter();
// 释放当前线程所持有的锁(资源)
int savedState = fullyRelease(node);
// 如果当前不在AQS队列里就阻塞
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 如果期间出现中断信号跳出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 节点等待持有锁
// acquireQueued 返回的是在加入队列期间是否中断过
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除等待状态不等于 Condition 的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
其中比较值得研究的是 checkInterruptWhileWaiting 方法,在其中根据中断出现的时间点判断是否应该抛出异常,即区分出属于 THROW_IE 还是 REINTERRUPT 两种状态。
如果是在 signal() 方法执行前出现的异常,则是 THROW_IE,反之是 REINTERRUPT 。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
signal() 与 signalAll 之流程
这里代码就比较简单了,选中链表首个节点,然后执行 doSignal 方法去做释放操作。
public final void signal() {
// 想要唤醒某个条件队列,需要先持有对应的锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
简单的链表操作,主要是通过 transferForSignal 方法进行释放操作,如果不成功且后续仍有节点,则循环继续,一直到成功释放一个节点。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
而 signal 与 signalAll 的主要区别就在于 doSignal(doSignalAll),其实就是去掉了一个 transferForSignal(first) = true 就退出的条件。
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
transferForSignal
transferForSignal 方法中主要疑惑的点在于,为什么 ws > 0(即取消状态)或者无法将等待状态修改为 SIGNAL 的时候需要唤醒当前节点?
仔细把整个条件队列的流程想了一下,就算没有唤醒这个节点,其实也是正常的!但是通过查阅很多资料得知,在这两个条件下可以先将线程唤醒去完成 await 方法后续的一些操作,如此以优化性能,反正 await 方法后面还会去调用 acquireQueued 方法去判定是否要在队列中阻塞。
不过为什么是这两个条件,我看了一个小时的资料也没想通,只有以下两个条件的时候满足这个分支:
- 前驱节点被取消
- 刚判定完 ws > 0 为 false 后,且在 CAS 操作后,前驱节点被取消
final boolean transferForSignal(Node node) {
// 如果无法从 CONDITION 状态更改为0,代表该节点并不在条件队列中
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 进行入队操作
// enq 方法会返回入队后的前驱节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驱节点是被取消的或者无法将其设置为 SIGNAL 状态则唤醒当前线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}