0. 序
在 Java 中锁的使用有两种方式,一种是通过 JVM 虚拟机层面提供的 synchronized 关键字,另一种则是使用 JUC 框架,而 AQS(全称 AbstractQueuedSynchronizer)是 JUC 大部分同步组件(例如赫赫有名的 RenntrantLock,Semaphore)的基础。
AQS 提供了原子式管理同步状态,阻塞和唤醒线程功能以及队列模型的框架,它非常之常用和强大,其实比较简单,但对于初学者来说,理解起来会有些许难度。
我们都知道 RenntrantLock 是一种独占锁,所以本文会通过 ReentrantLock 为切入点,来了解 AQS 是如何被使用的,AQS 的独占模式是如何实现的,接着通过解读源码来剖析 AQS 的实现原理,深入 AQS,吃透 AQS。
1. 前置知识
文章开篇让我们先掌握几个前置知识点,帮助理解下文
公平锁 VS 非公平锁
举个生活中的例子,食堂内排队买饭,人们秩序有条,一个接一个的排队。
非公平顾名思义,你排的好好的,突然有个人来插你的队,排到了你前面,这就不公平了。
也就是说,线程 A,线程 B 在 AQS 队列中排队,如果线程 C 老老实实的排队,就是公平锁,如果线程 A
锁释放,线程 C 抢到锁时机在线程 B 出队过程中,线程 A 唤醒线程 B 之前,线程 C 就排到了线程 B 的前面,插队了,就是非公平锁。
重入锁 VS 不可重入锁
重入就是说是否同一个线程支持反复加锁释放锁,同一线程加锁几次就需要释放锁几次。
独占模式锁 VS 共享模式锁
所谓独占锁,就是只要有一个线程加锁,其他人都得干等着,这把锁属于某个线程独占,这就是独占锁。
所谓共享锁,就是可以和别的线程同时持有一把锁,比如读写锁,线程 A 加了读锁,线程 B 还是可以加读锁的,它们共享一把锁,这就是共享锁。
AQS类描述
AbstractQueuedSynchronizer 是 AQS 体系的核心基类(抽象类),使用了模版设计模式,它定义了锁的基本行为,实现了锁的基本存储结构,下面我们来一起了解下。
AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer,此类比较简单,可以看到 transient 修饰的独占线程字段并提供了 set,get 方法获取,用于记录锁当前的持有者线程。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// 独占线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
回到 AbstractQueuedSynchronizer 类,来看其内部结构,它使用了是类模版设计模式,实现了锁的基本存储结构,定义了锁的行为。
State
它定义了字段 state,代表当前锁的状态,并用 volatile 关键字修饰,来应对并发场景
private volatile int state;
这里先说明下,state 为 0 代表当前锁并未被任何线程占用,state 大于 0 则代表有线程持有当前锁,其中 state 大于 1 表示重入。
提供了针对 state 的三个方法,其中值得一提的是 compareAndSetState 方法,通过 CAS 方式更新 state,来保证原子性。
// 获取state
protected final int getState() {
return state;
}
// 设置state
protected final void setState(int newState) {
state = newState;
}
// CAS更新state 该方法能够保证状态设置的原子性
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 加锁和释放锁达到同步状态实际上就是通过 CAS 对 State 值进行修改达到。
这里简单带过,后续会做讲解。
CAS 是什么
要知道 volatile 只能保证可见性,无法保证原子性,查询并更新 state 的值并不是一个原子性操作,在多线程环境中并不安全。
而操作系统提供了一个新的 CPU 指令(CAS),CompareAndSwap,即比较并替换。具体的原理是在更新一个值之前,首先比较这个值是否发生了变化,如果确实发生了变化,那么就会更新失败,否则更新成功
AQS 的 CLH
接着继续来看 AQS 内部结构,你会发现这么两个字段,结构同为 Node 的 head 和 tail,见名知意,一个头节点,一个尾节点。
private transient volatile Node head;
private transient volatile Node tail;
Node 结构的定义如下:
可以看到 Node 中封装了线程 thread,并维护了前驱节点 prev,和后继节点 next,从而构造了一个 FIFO 的双向队列,同时定义了 waitStatus 表示当前节点在队列中的等待状态并提供了 waitStatus 的一些可能值的定义。
其实这里就大概能看出,AQS 是通过将每个请求共享资源的线程封装到一个 Node 节点中,通过 waitStatus 状态来实现锁的分配,prev,next 用于构建同步等待队列(阻塞队列),而 nextWatier 用于构建条件等待队列,这里需要注意的是阻塞队列是不包含 head 的。
// 双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。
static final class Node {
// 节点当前以共享模式等待锁
static final Node SHARED = new Node();
// 节点当前以独占模式等待锁
static final Node EXCLUSIVE = null;
// waitStatus状态为1,表示当前线程取消了争抢这个锁
static final int CANCELLED = 1;
// waitStatus状态为-1,其表示当前node的后继节点需要被唤醒。也就是说这个waitStatus其实代表的不是自己的状态,而是后继节点的状态
static final int SIGNAL = -1;
// waitStatus状态为-2,表示当前节点在等待队列中,等待唤醒
static final int CONDITION = -2;
// waitStatus状态为-3,共享模式狭隘释放锁才会使用
static final int PROPAGATE = -3;
// 当前节点在队列中的等待状态
volatile int waitStatus;
// 前驱指针,当节点添加到同步队列时被设置(尾部添加)
volatile Node prev;
// 后继指针
volatile Node next;
// 当前处于该节点的线程
volatile Thread thread;
// 指向下一个处于Condition状态的节点
Node nextWaiter;
}
这里笔者画了个图帮助各位小伙伴理解一下
看了上图之后,你也许很好奇线程 t1 所在的节点的前驱节点为什么是空节点呢,它的作用是什么,请先保留疑问,带着疑问,继续往下看。
AQS 方法定义
翻看 AQS 类代码的过程中,你可能会发现 AQS 作为被 abstract 修饰的抽象类,却没有抽象方法,而是单纯用 protected 修饰,这是为什么呢?
要知道上文我们知道 AQS 是 JUC 大部分同步组件的基类,它并不建议你直接 new AQS 来使用,且如果方法修饰了 abstract 则需要每个子类去实现,这是不合理的,所以它让子类按需实现。
而 AQS 中被 public 修饰的方法可以认定为模版方法,不建议子类覆盖实现。
下面来看下,AQS 被使用最多的方法定义:
方法名 | 描述 |
---|---|
protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到 Condition 才需要去实现它。 |
protected boolean tryAcquire(int arg) | 独占方式。arg 为获取锁的次数,尝试获取资源,成功则返回 true,失败则返回 false。 |
protected boolean tryRelease(int arg) | 独占方式。arg 为释放锁的次数,尝试释放资源,成功则返回 true,失败则返回 false。 |
protected int tryAcquireShared(int arg) | 共享方式。arg 为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。 |
上述表格你会发现,tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 分别 2 对,一对实现独占,一对实现共享。也就是说 AQS 支持自定义同步组件同时实现独占和共享两种方式,非常强大。
这里你只需要大概清楚上表中每个方法的作用即可,后续会做详细讲解。
相信我们大家或多或少都有使用过 ReentrantLock,Semaphore,两者均为基于 AQS 的实现,
ReentrantLock实现了 AQS 的独占模式
Semaphore实现了 AQS 的共享模式
下面我们将通过解读这俩常用组件源码,来剖析 AQS 的原理。
2. ReentrantLock
ReentrantLock 实现了 AQS 的独占模式,它是一个可重入锁,同时提供了公平锁与非公平锁的实现。
来看结构,由于 Java 不支持多继承,所以由 ReentrantLock 继承抽象类 Lock,用内部类的方式继承 AQS,ReentrantLock 在具体实现锁时基本都是委托内部类 Sync,而 Sync 又继承自 AQS。Sync 内部有两个子类,分别是 FairSync(公平锁)与 NonfairSync(非公平锁)。
2.1 申请锁
来看加锁流程,首先来看 ReentrantLock 支持超时时间的锁申请
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
调用 Sync 的 tryAcquireNanos 方法
// 独占模式,忽略中断
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果线程的中断位标记为 true,表示应用方主动放弃锁的申请,可以直接抛出中断异常,结束锁的申请
if (Thread.interrupted())
throw new InterruptedException();
// 调用 Sync 的 tryAcquire 尝试获取锁。如果返回 true,表示成功获取锁,可以直接返回;不然就调用 doAcquireNanos,进入锁等待队列
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
tryAcquire 方法在具体的子类中实现
该方法返回 true 则表示获取到同步状态锁
没有线程在等待锁
重入锁,线程本来就持有锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
先来看 ReentrantLock 公平锁的实现,它在 FairSync 类中。
公平锁实现
protected final boolean tryAcquire(int acquires) {
// 确保获取当前申请锁的线程
final Thread current = Thread.currentThread();
// state 字段的含义是当前锁的重入次数
int c = getState();
//如果 state 为 0,表示当前锁并没有被占用 说明此时此刻锁是可以用的
if (c == 0) {
// 判断阻塞队列中有没有其他线程在排队
if (!hasQueuedPredecessors() &&
// CAS获取锁,失败说明刚刚同一时刻其他线程抢先了 因为刚刚判断 c == 0 证明没有锁被占用
compareAndSetState(0, acquires)) {
// 获取到锁,标记独占模式,占有锁
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入场景 不存在并发问题
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
首先获取当前线程,确保获取到当前申请锁的线程,
接着获取锁的当前状态,state,也就是我们上文提到的 state 为 0 代表当前锁并未被任何线程占用,state 大于 0 则代表有线程持有当前锁,其中 state 大于 1 表示重入。
上述代码流程中,如果 state 不为 0,表示锁已经被占用,但因为 ReentrantLock 支持可重入性,所以如果当前线程是锁的持有者,则只需要更新 state 的值,而不用 CAS 更新,否则就返回 false,这个线程最终会调用 doAcquireNanos,进入到同步阻塞队列,排队获取锁。
上述代码流程中,如果 state 为 0 则说明当前锁并没有被占用,则需要判断阻塞队列中有没有其他线程在排队,如果有,公平锁此时无法竞争锁,返回 false,尝试获取锁失败。这个线程最终会调用 doAcquireNanos,进入到同步阻塞队列,排队获取锁。
阻塞队列中没有其他排队的其他线程,则进行 CAS 获取锁,如果获取锁成功,则标记独占模式,占有锁。
来详细看下 hasQueuedPredecessors 方法的实现
实际上它做了这么一件事,判断当前线程是否位于 CLH 阻塞队列的第一个,是则返回true,否则返回false
public final boolean hasQueuedPredecessors() {
// 尾结点
Node t = tail;
// 头节点
Node h = head;
Node s;
// 头节点 != 尾结点 且 同步队列第一个节点不为 null 且 当前线程不是阻塞队列第一个节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
非公平锁实现
来看 tryAcquire 非公平锁的实现,它在 NonfairSync 类中。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // 非公平锁获取锁
}
其实我们对比公平锁,可以发现代码几乎一致
不同点在于,非公平锁会首先和阻塞队列中的线程竞争,如果竞争成功,可以直接获取锁。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 和公平锁的不同点
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
进入阻塞队列
上文中我们了解到,公平锁和非公平锁的 tryAcquire,未获取锁成功,则会调用 doAcquireNanos,进入锁等待队列。
static final long spinForTimeoutThreshold = 1000L;
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 是否已超时
if (nanosTimeout <= 0L)
// 超时返回false 表示锁获取失败
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 独占模式 入队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 循环
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是Head则尝试再次获取锁 如果成功获取锁走if里面的逻辑
if (p == head && tryAcquire(arg)) {
// 将获取锁的节点设置为Head节点
setHead(node);
// help GC
p.next = null;
failed = false;
// 成功获取锁,返回true
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果超时,获取锁失败
if (nanosTimeout <= 0L)
return false;
// 没超时继续锁的申请流程 判断是否需要挂起当前线程
// 走到这里了要么当前node本来就不是队头,要么就是tryAcquire(arg)没有抢赢别人
if (shouldParkAfterFailedAcquire(p, node) &&
// 获取锁的剩余时间是否大于spinForTimeoutThreshold 如果是则通过自旋方式再次执行获取锁相关的逻辑。,否则将使线程阻塞
// spinForTimeoutThreshold 默认为 1s,避免线程切换带来的额外开销,因为如果本次锁申请距超时还剩不到 1s,就没有必要再阻塞线程,
nanosTimeout > spinForTimeoutThreshold)
// 带超时时间的线程阻塞
LockSupport.parkNanos(this, nanosTimeout);
// 否则检查线程的中断标记,是否被中断
if (Thread.interrupted())
// 中断,抛出中断异常,获取锁失败,返回false
throw new InterruptedException();
}
} finally {
if (failed)
// 将阻塞队列中对应节点的状态设置为CANCEL
cancelAcquire(node);
}
}
上述 boolean doAcquireNanos(int arg, long nanosTimeout)
代码实际做了这么几件事
(1)判断获取锁是不是已经超时,如果是,返回 false。
(2)以独占模式调用 addWaiter 加入阻塞队列(下文会详细解释)。
(3)获取节点的前驱节点,如果节点的前驱节点为头节点,再次调用 tryAcquire 方法尝试获取锁。
(4)如果成功获取锁,则将当前节点设置为 Head,表示当前它是锁的持有者。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
这里你可能会有疑惑,为什么头节点持有的线程对象是空呢?
也就是说头节点是锁的持有者,但实际上头节点却并不维护线程对象。
因为锁的持有者被记录在了 AQS 的 Thread exclusiveOwnerThread 属性中。
这样做的好处是:在实现非公平锁时,如果锁被新线程抢占,不需要更新头节点。
(5)如果当前节点不是头节点,或者没有成功获取锁,调用 shouldParkAfterFailedAcquire 判断当前线程是否需要阻塞,如果需要阻塞,则调用 LockSupport.parkNanos 阻塞线程。
简单说明下,如果前驱节点的状态为 SINGNAL = -1,则表示当前线程直接进入阻塞队列,排队获取锁。如果前驱节点的状态 > 0 时,对应状态 CANCELLED = 1,说明是已取消的节点,都需要被删除(这里运用了循环将当前节点前面状态 CANCELLED = 1 的连续取消节点删除,直到遇到非取消节点立即停止删除,类似分段思想,在多线程的场景,当前线程为段尾从后向前方向删除连续取消节点,并不影响后续其他线程操作节点,所以是线程安全的)
前驱节点的 waitStatus 不等于 -1 和 1,那也就是只可能是 0,-2,-3,对于 ReentrantLock 来说只可能是 0,如果前驱节点的状态为 0 也就是初始状态,那么需要将前驱节点的状态更新为 SIGNAL = -1,
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
不阻塞线程,而是外层 doAcquireNanos 通过自旋方式再次执行获取锁相关的逻辑。
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前一个节点的等待状态
int ws = pred.waitStatus;
// 说明前驱节点状态正常,则当前线程直接进入到阻塞队列,当前线程需要挂起,等待以后被唤醒
if (ws == Node.SIGNAL)
// 表明当前线程可以被park,安全的阻塞等待。
return true;
if (ws > 0) {
// (进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的
do {
// 如果前驱节点的状态大于 0 表明该线程的前驱节点已经等待超时或者取消或者被中断了,则需要从CLH队列中将该前一个节点删除掉,循环回溯,直到前一个节点状态 <= 0 遇到非取消节点立即停止删除 (循环将前面取消的节点都出队)
node.prev = pred = pred.prev;
// waitStatus > 0 说明前驱节点取消了排队
} while (pred.waitStatus > 0);
pred.next = node;
}
// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
else {
// 通常情况下,每个新的node入队时,waitStatus都是0,前驱节点是之前的tail,那么它的waitStatus应该是0
// 每个node在入队的时候,都会把前驱节点的状态改为SIGNAL(-1),不阻塞线程,而是再次试图获取锁相关的逻辑。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 这个方法返回 false,那么会再走一次 for 循序,然后再次进来此方法,此时会从第一个分支 ws==Node.SIGNAL 返回 true
return false;
}
(6)否则检查线程的中断标记,是否被中断。
(7)最后将阻塞队列中对应节点的状态设置为 CANCEL
接着来详细看下 addWaiter 方法是如何处理的
tail 节点刚刚我们已经分析过了它是 CLH 双向链表构成的阻塞队列的尾结点,addWaiter 流程简单来说其实就是将线程包装成 Node 节点,方便后续对线程进行阻塞或唤醒,并将 Node 节点添加至 CLH 阻塞队列的最后面。
这里需要注意的是链表的操作在多线程环境中是不安全的,所以这里引入了 CAS 机制,通过 if (compareAndSetTail(pred, node))
来将自己设置为队尾,更新成功则结束,否则 enq(node) 无限重试直到更新成功。
private Node addWaiter(Node mode) {
// 创建一个 Node 节点,将线程对象存储在 Node 节点中
Node node = new Node(Thread.currentThread(), mode);
// 记录原尾节点
Node pred = tail;
// 说明有线程已经在该锁上等待
if (pred != null) {
// 添加新节点到队列尾部
node.prev = pred;
// CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
if (compareAndSetTail(pred, node)) {
// cas成功,原尾结点的下一个节点为新节点
pred.next = node;
// 线程入队了,返回节点
return node;
}
}
// 直到 enq 方法的场景等待队列为空,或者cas失败有线程竞争入队。那么自旋入阻塞队列 则重试直到成功
enq(node);
return node;
}
private Node enq(final Node node) {
// 无限重试直到成功
for (;;) {
// 记录原尾节点
Node t = tail;
// 队列为空则重新创建为空头尾结点
if (t == null) {
// 此时Node waitstatus为0
if (compareAndSetHead(new Node()))
// tail指向head
tail = head;
}
// 将当前线程排到队尾,有线程竞争的话排不上重试
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
最后如果 doAcquireNanos
获取锁失败则执行 cancelAcquire
将阻塞队列中对应节点的状态设置为CANCEL
finally {
if (failed)
// 将阻塞队列中对应节点的状态设置为CANCEL
cancelAcquire(node);
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 如果node是尾节点,CAS设置为新的尾结点
if (node == tail && compareAndSetTail(node, pred)) {
// 成功则cas设置pred的下一个节点为null置空
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
}
// pred为首节点
else {
// 唤醒 node 的下一个节点的线程等待
unparkSuccessor(node);
}
// help GC
node.next = node;
}
}
至此 RenntrantLock tryLock 尝试获取锁方法流程结束
这里笔者画了个流程图,帮助大家理解
同理其实 RenntrantLock Lock 逻辑类似,对应方法上文都已经解读过,这里不在重复说明了。
public final void acquire(int arg) {
// 尝试获取同步状态,获取成功设置锁状态并返回true,执行业务逻辑,否则获取失败返回false,需子类自己实现,获取成功就不需要进队列排队
if (!tryAcquire(arg) &&
// 生成Node节点,需要把当前线程挂起。加入到CLH阻塞队列,
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 获取到锁,中断线程
selfInterrupt();
}
如果 tryAcquire(int arg)
方法返回 false ,即获取同步状态失败,则调用 addWaiter(Node mode)
方法,将当前线程加入到 CLH 同步队列尾部。并且, mode
方法参数为 Node.EXCLUSIVE
,表示独占模式。
调用 boolean #acquireQueued(Node node, int arg)
方法,自旋直到获得同步状态成功。当返回 true 时,表示在这个过程中,发生过线程中断。
也就是满足 parkAndCheckInterrupt
条件(下文会详细介绍这里),设置了 interrupted 标识为 true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
parkAndCheckInterrupt
中返回了 Thread.interrupted()
这个方法会返回当前线程的中断状态,但是这个方法会清除线程中断的标记。
所以在种情况下,外层 acquire 流程中会执行 selfInterrupt()
方法,恢复线程中断的标记。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 恢复线程中断的标记
selfInterrupt();
}
下面代码为一个自旋的过程,当前线程节点进入同步队列后,就会进入一个自旋的过程,每个节点都会自省观察,当条件满足,获取到锁后,就可以从这个自旋过程中退出,否则会一直执行下去。
// acquireQueued方法可以对排队中的线程进行获锁操作。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 记录过程中,是否发生了线程中断
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,当前节点在真实数据队列的首部,因为它的前驱是head。注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列,所以当前节点可以去试抢一下锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,设置当前节点为新的头节点
setHead(node);
// 设置老的头节点p不在指向下一个节点,加速gc
p.next = null;
// 获取同步状态成功
failed = false;
// 返回记录状态
return interrupted;
}
// 走到这里了要么当前node本来就不是队头,要么就是tryAcquire(arg)没有抢赢别人 需要判断是否需要挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 负责挂起线程
parkAndCheckInterrupt())
// 不响应中断 如果等待过程中被中断过就将interrupted标记置为true
interrupted = true;
}
} finally {
// 获取同步状态发生异常,取消获取
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt()
的作用是阻塞当前线程,并且返回线程被唤醒之后的中断状态。
它会先通过 LockSupport.park()
阻塞当前线程。当前驱节点对应的线程使用完锁之后,通过 unpark() 方式唤醒当前线程时,阻塞线程会在此处被唤醒。
private final boolean parkAndCheckInterrupt() {
// 挂起,进入阻塞等待唤醒的状态,等待被唤醒
LockSupport.park(this);
// 在线程被唤醒时,被唤醒后回到 if (p == head && tryAcquire(arg))
// Thread.interrupted 返回当前线程是否被中断,并清除中断标记位。所以当该方法返回true(被中断)时,需要手动补偿中断标记位。
return Thread.interrupted();
}
结合刚刚分析的 acquireQueued() 流程。当前线程被中断过,则执行selfInterrupt();
,否则不会执行。
也就是说在 acquireQueued() 流程中,即使是线程在阻塞状态被中断唤醒而获取到 cpu 执行权利,但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。该线程会再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒,线程被唤醒后会继续尝试获取锁,如果获取锁成功时 acquireQueued()
流程会返回 true。
但在该线程成功获取锁成功之前,它的中断会被忽略并且中断标记会被清除。
这是因为在 parkAndCheckInterrupt()
中,我们线程的中断时调用了 Thread.interrupted()
。
该函数不同于 Thread 的 isInterrupted() 函数,isInterrupted() 仅仅返回中断状态,而interrupted() 在返回当前中断状态之后,还会清除中断标记。
正因为之前的中断标记被清除了,所以这里需要调用 selfInterrupt() 重新产生一个中断。
如果你对 interrupt() 和 interrupted() 的使用还有疑惑,可以参考这篇文章
interrupt() VS interrupted() VS interrupted()》)
非公平锁
final void lock() {
// CAS设置state成功(获取锁成功)将当前线程设置为独占线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// CAS设置state失败(获取锁失败)调用AQS#acquire(1)方法获取同步状态
else
acquire(1);
}
公平锁
final void lock() {
// 调用AQS#acquire(1)方法获取同步状态
acquire(1);
}
这里笔者也画了个流程图,帮助大家理解
2.2 释放锁
上文对申请锁流程做了解读,下面我们来看 RenntrantLock 是如何释放锁的。
RenntrantLock 的 unlock 方法调用了 sync.release(1);
public void unlock() {
sync.release(1);
}
下述代码是 AQS 中 relsease 的实现
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
// 释放成功则获取头节点Head
Node h = head;
// 如果头节点不为空且waitStatus不为0,说明需要唤醒第一个未取消的后继节点
if (h != null && h.waitStatus != 0)
// 唤醒第一个未取消的后继节点
unparkSuccessor(h);
// 否则释放锁成功
return true;
}
return false;
}
relsease
方法它实际上做了这么几件事
(1)尝试释放锁
RenntrantLock 实现了 tryRelease 方法
首先判断了当前线程是不是锁的持有者,如果不是则抛出异常,否则判断锁剩余占有次数,如果为 0,则说明锁已释放,需要唤醒阻塞队列中其它排队线程。
这里需要注意:如果一个锁被同一个线程重入 n 次,那对应也要释放 n 次。当持有次数为 0 时,表示可以释放锁
protected final boolean tryRelease(int releases) {
// 用当前所的重入次数减去本次释放的次数
int c = getState() - releases;
// 当前线程是不是锁的持有者
if (Thread.currentThread() != getExclusiveOwnerThread())
// 不是则异常
throw new IllegalMonitorStateException();
// 当前线程是锁的持有者,是否完全释放锁
boolean free = false;
// c==0,代表完全释放(如果一个锁被同一个线程重入 n 次,那对应也要释放 n 次。当持有次数为 0 时,表示可以释放锁),其他线程可以获取同步状态了
if (c == 0) {
// 标记是否成功
free = true;
// 并将锁的占用者置空
setExclusiveOwnerThread(null);
}
// 更新锁的状态(持有次数)
setState(c);
return free;
(2)tryRelease(arg) 尝试获取锁成功,需要唤醒阻塞队列中下一个线程
这里需要注意,如果使用的是非公平锁,新的线程此时可以直接获取到锁,插队,唤醒的线程没获取到锁就需要进入阻塞队列中排队。
这里的判断条件为什么是h != null && h.waitStatus != 0 呢?
h null Head还没初始化。初始情况下,head null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
从阻塞队列中查找下一个待唤醒的线程对应的是 unparkSuccessor 方法
// 入参是头节点
private void unparkSuccessor(Node node) {
// 当前节点状态
int ws = node.waitStatus;
// 如果head节点当前waitStatus<0, CAS将其修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当前节点后继节点
Node s = node.next;
// 唤醒后继节点,从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
// 后继节点为空或者状态>0 (超时或中断)
if (s == null || s.waitStatus > 0) {
s = null;
// 从链表的尾部开始寻找,找到头节点后面的第一个非取消节点。
for (Node t = tail; t != null && t != node; t = t.prev)
// 从tail寻找可用节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点线程, 线程被唤醒后会继续去竞争锁。
LockSupport.unpark(s.thread);
}
上述 void unparkSuccessor(Node node)
方法做了这么几件事
(1)确认头节点的状态。如果头节点的状态不为 0,则更新为 0
(2)入参是头节点,在释放锁时,需要找到头节点下一个未取消的节点
(3)从链表的尾部开始寻找,找到头节点后面的第一个非取消节点。
这里说明一下,因为我们在维护节点的前驱节点时使用了 CAS,通过前驱节点遍历是可靠的,不会遗漏节点。
这里你可能会有疑问为什么是从 tail
尾节点开始寻找,而不是从 node.next
开始呢?
之所以从尾部开始遍历,是为了能保证遍历到所有的节点。
比如,当前 node 是前 tail 节点,新的 node2 正在变成 tail 节点,但是 addWaiter
中 pred.next = node;
并不是原子操作,很可能这步还未来得及执行,如果正向遍历,node.next,会为null,就会遗漏新加入的节点,但是从 tail 开始遍历肯定没问题,因为在设置 tail 节点时,compareAndSetTail(pred, node)
是原子操作
(4)找到对应的节点,调用 LockSupport.unpark 方法唤醒线程。线程被唤醒后会继续去竞争锁。这里唤醒的是申请锁时用 LockSupport.park 阻塞的线程,因为这样可以让锁的申请和释放形成闭环通道。
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 刚刚线程被挂起在这里了
return Thread.interrupted();
}
之后会走到这个逻辑 acquireQueued(final Node node, int arg)
,这个时候,node的前驱是head。
这里笔者也画了个流程图,帮助大家理解
3. 小结
简单总结下
其实在 AQS 中维护着一个 CLH 的阻塞队列
当线程获取同步状态失败后,则会加入到 CLH 阻塞队列的队尾,并一直保持着自旋。
在 CLH 阻塞队列中的线程在自旋时,会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出 CLH 阻塞队列。
当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。
4. 参考
https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
评论区