侧边栏壁纸
博主头像
灬编程小子博主等级

写程序就是一个不断追求完美的过程

  • 累计撰写 4 篇文章
  • 累计创建 1 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录
JUC

AQS独占模式实现原理

灬编程小子
2023-03-10 / 0 评论 / 0 点赞 / 151 阅读 / 33520 字

0. 序

在 Java 中锁的使用有两种方式,一种是通过 JVM 虚拟机层面提供的 synchronized 关键字,另一种则是使用 JUC 框架,而 AQS(全称 AbstractQueuedSynchronizer)是 JUC 大部分同步组件(例如赫赫有名的 RenntrantLockSemaphore)的基础。

AQS 提供了原子式管理同步状态,阻塞和唤醒线程功能以及队列模型的框架,它非常之常用和强大,其实比较简单,但对于初学者来说,理解起来会有些许难度。

我们都知道 RenntrantLock 是一种独占锁,所以本文会通过 ReentrantLock 为切入点,来了解 AQS 是如何被使用的,AQS 的独占模式是如何实现的,接着通过解读源码来剖析 AQS 的实现原理,深入 AQS,吃透 AQS。

1. 前置知识

文章开篇让我们先掌握几个前置知识点,帮助理解下文

公平锁 VS 非公平锁

举个生活中的例子,食堂内排队买饭,人们秩序有条,一个接一个的排队。

非公平顾名思义,你排的好好的,突然有个人来插你的队,排到了你前面,这就不公平了。

也就是说,线程 A,线程 B 在 AQS 队列中排队,如果线程 C 老老实实的排队,就是公平锁,如果线程 A

锁释放,线程 C 抢到锁时机在线程 B 出队过程中,线程 A 唤醒线程 B 之前,线程 C 就排到了线程 B 的前面,插队了,就是非公平锁。

重入锁 VS 不可重入锁

重入就是说是否同一个线程支持反复加锁释放锁,同一线程加锁几次就需要释放锁几次。

独占模式锁 VS 共享模式锁

所谓独占锁,就是只要有一个线程加锁,其他人都得干等着,这把锁属于某个线程独占,这就是独占锁。

所谓共享锁,就是可以和别的线程同时持有一把锁,比如读写锁,线程 A 加了读锁,线程 B 还是可以加读锁的,它们共享一把锁,这就是共享锁。

AQS类描述

AbstractQueuedSynchronizer 是 AQS 体系的核心基类(抽象类),使用了模版设计模式,它定义了锁的基本行为,实现了锁的基本存储结构,下面我们来一起了解下。

AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer,此类比较简单,可以看到 transient 修饰的独占线程字段并提供了 set,get 方法获取,用于记录锁当前的持有者线程。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

      // 独占线程
    private transient Thread exclusiveOwnerThread; 

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
  
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

回到 AbstractQueuedSynchronizer 类,来看其内部结构,它使用了是类模版设计模式,实现了锁的基本存储结构,定义了锁的行为。

State

它定义了字段 state,代表当前锁的状态,并用 volatile 关键字修饰,来应对并发场景

private volatile int state; 

这里先说明下,state 为 0 代表当前锁并未被任何线程占用,state 大于 0 则代表有线程持有当前锁,其中 state 大于 1 表示重入。

提供了针对 state 的三个方法,其中值得一提的是 compareAndSetState 方法,通过 CAS 方式更新 state,来保证原子性。

 // 获取state    
protected final int getState() {
        return state;
    }

// 设置state
    protected final void setState(int newState) {
        state = newState; 
    }

   // CAS更新state 该方法能够保证状态设置的原子性
    protected final boolean compareAndSetState(int expect, int update) {  
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 
    }

AQS 加锁和释放锁达到同步状态实际上就是通过 CAS 对 State 值进行修改达到。

这里简单带过,后续会做讲解。

CAS 是什么

要知道 volatile 只能保证可见性,无法保证原子性,查询并更新 state 的值并不是一个原子性操作,在多线程环境中并不安全。

而操作系统提供了一个新的 CPU 指令(CAS),CompareAndSwap,即比较并替换。具体的原理是在更新一个值之前,首先比较这个值是否发生了变化,如果确实发生了变化,那么就会更新失败,否则更新成功

AQS 的 CLH

接着继续来看 AQS 内部结构,你会发现这么两个字段,结构同为 Node 的 head 和 tail,见名知意,一个头节点,一个尾节点。


private transient volatile Node head; 

private transient volatile Node tail;

Node 结构的定义如下:

可以看到 Node 中封装了线程 thread,并维护了前驱节点 prev,和后继节点 next,从而构造了一个 FIFO 的双向队列,同时定义了 waitStatus 表示当前节点在队列中的等待状态并提供了 waitStatus 的一些可能值的定义。

其实这里就大概能看出,AQS 是通过将每个请求共享资源的线程封装到一个 Node 节点中,通过 waitStatus 状态来实现锁的分配,prev,next 用于构建同步等待队列(阻塞队列),而 nextWatier 用于构建条件等待队列,这里需要注意的是阻塞队列是不包含 head 的。

// 双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。     
static final class Node { 
     
        // 节点当前以共享模式等待锁
        static final Node SHARED = new Node(); 
        
             // 节点当前以独占模式等待锁
        static final Node EXCLUSIVE = null; 
 
             // waitStatus状态为1,表示当前线程取消了争抢这个锁
        static final int CANCELLED =  1; 
       
             // waitStatus状态为-1,其表示当前node的后继节点需要被唤醒。也就是说这个waitStatus其实代表的不是自己的状态,而是后继节点的状态
        static final int SIGNAL    = -1; 
     
        // waitStatus状态为-2,表示当前节点在等待队列中,等待唤醒
        static final int CONDITION = -2;
      
              // waitStatus状态为-3,共享模式狭隘释放锁才会使用
        static final int PROPAGATE = -3; 

              // 当前节点在队列中的等待状态
        volatile int waitStatus; 
       
              // 前驱指针,当节点添加到同步队列时被设置(尾部添加)
        volatile Node prev; 
          
              // 后继指针
        volatile Node next; 

              // 当前处于该节点的线程
        volatile Thread thread; 

              // 指向下一个处于Condition状态的节点
        Node nextWaiter; 

   }   
    

这里笔者画了个图帮助各位小伙伴理解一下

AQS-独占-1.png

看了上图之后,你也许很好奇线程 t1 所在的节点的前驱节点为什么是空节点呢,它的作用是什么,请先保留疑问,带着疑问,继续往下看。

AQS 方法定义

翻看 AQS 类代码的过程中,你可能会发现 AQS 作为被 abstract 修饰的抽象类,却没有抽象方法,而是单纯用 protected 修饰,这是为什么呢?

要知道上文我们知道 AQS 是 JUC 大部分同步组件的基类,它并不建议你直接 new AQS 来使用,且如果方法修饰了 abstract 则需要每个子类去实现,这是不合理的,所以它让子类按需实现。

而 AQS 中被 public 修饰的方法可以认定为模版方法,不建议子类覆盖实现。

下面来看下,AQS 被使用最多的方法定义:

方法名

描述

protected boolean isHeldExclusively()

该线程是否正在独占资源。只有用到 Condition 才需要去实现它。

protected boolean tryAcquire(int arg)

独占方式。arg 为获取锁的次数,尝试获取资源,成功则返回 true,失败则返回 false。

protected boolean tryRelease(int arg)

独占方式。arg 为释放锁的次数,尝试释放资源,成功则返回 true,失败则返回 false。

protected int tryAcquireShared(int arg)

共享方式。arg 为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

protected boolean tryReleaseShared(int arg)

共享方式。arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。

上述表格你会发现,tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 分别 2 对,一对实现独占,一对实现共享。也就是说 AQS 支持自定义同步组件同时实现独占和共享两种方式,非常强大。

这里你只需要大概清楚上表中每个方法的作用即可,后续会做详细讲解。

相信我们大家或多或少都有使用过 ReentrantLock,Semaphore,两者均为基于 AQS 的实现,

  • ReentrantLock实现了 AQS 的独占模式

  • Semaphore实现了 AQS 的共享模式

下面我们将通过解读这俩常用组件源码,来剖析 AQS 的原理。

2. ReentrantLock

ReentrantLock 实现了 AQS 的独占模式,它是一个可重入锁,同时提供了公平锁非公平锁的实现。

来看结构,由于 Java 不支持多继承,所以由 ReentrantLock 继承抽象类 Lock,用内部类的方式继承 AQS,ReentrantLock 在具体实现锁时基本都是委托内部类 Sync,而 Sync 又继承自 AQS。Sync 内部有两个子类,分别是 FairSync(公平锁)与 NonfairSync(非公平锁)。

AQS-独占-2.png

2.1 申请锁

来看加锁流程,首先来看 ReentrantLock 支持超时时间的锁申请

  public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

调用 Sync 的 tryAcquireNanos 方法

// 独占模式,忽略中断 
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
       // 如果线程的中断位标记为 true,表示应用方主动放弃锁的申请,可以直接抛出中断异常,结束锁的申请
        if (Thread.interrupted()) 
            throw new InterruptedException();
     // 调用 Sync 的 tryAcquire 尝试获取锁。如果返回 true,表示成功获取锁,可以直接返回;不然就调用 doAcquireNanos,进入锁等待队列
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

tryAcquire 方法在具体的子类中实现

该方法返回 true 则表示获取到同步状态锁

  1. 没有线程在等待锁

  2. 重入锁,线程本来就持有锁

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

先来看 ReentrantLock 公平锁的实现,它在 FairSync 类中。

公平锁实现

  protected final boolean tryAcquire(int acquires) {
           // 确保获取当前申请锁的线程
            final Thread current = Thread.currentThread();
          // state 字段的含义是当前锁的重入次数
            int c = getState(); 
          //如果 state 为 0,表示当前锁并没有被占用 说明此时此刻锁是可以用的
            if (c == 0) {
               // 判断阻塞队列中有没有其他线程在排队
                if (!hasQueuedPredecessors() &&
                     // CAS获取锁,失败说明刚刚同一时刻其他线程抢先了 因为刚刚判断 c == 0 证明没有锁被占用
                    compareAndSetState(0, acquires)) {
                   // 获取到锁,标记独占模式,占有锁
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
                 // 锁重入场景 不存在并发问题
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

首先获取当前线程,确保获取到当前申请锁的线程,

接着获取锁的当前状态,state,也就是我们上文提到的 state 为 0 代表当前锁并未被任何线程占用,state 大于 0 则代表有线程持有当前锁,其中 state 大于 1 表示重入。

上述代码流程中,如果 state 不为 0,表示锁已经被占用,但因为 ReentrantLock 支持可重入性,所以如果当前线程是锁的持有者,则只需要更新 state 的值,而不用 CAS 更新,否则就返回 false,这个线程最终会调用 doAcquireNanos,进入到同步阻塞队列,排队获取锁。

上述代码流程中,如果 state 为 0 则说明当前锁并没有被占用,则需要判断阻塞队列中有没有其他线程在排队,如果有,公平锁此时无法竞争锁,返回 false,尝试获取锁失败。这个线程最终会调用 doAcquireNanos,进入到同步阻塞队列,排队获取锁。

阻塞队列中没有其他排队的其他线程,则进行 CAS 获取锁,如果获取锁成功,则标记独占模式,占有锁。

来详细看下 hasQueuedPredecessors 方法的实现

实际上它做了这么一件事,判断当前线程是否位于 CLH 阻塞队列的第一个,是则返回true,否则返回false

  public final boolean hasQueuedPredecessors() { 
        // 尾结点
        Node t = tail;
        // 头节点
        Node h = head;
        Node s;
        // 头节点 != 尾结点 且 同步队列第一个节点不为 null 且 当前线程不是阻塞队列第一个节点
        return h != t && 
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

非公平锁实现

来看 tryAcquire 非公平锁的实现,它在 NonfairSync 类中。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires); // 非公平锁获取锁
}

其实我们对比公平锁,可以发现代码几乎一致

不同点在于,非公平锁会首先和阻塞队列中的线程竞争,如果竞争成功,可以直接获取锁。

 final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState(); 
            if (c == 0) { 
                  // 和公平锁的不同点
                if (compareAndSetState(0, acquires)) { 
                    setExclusiveOwnerThread(current); 
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires; 
                if (nextc < 0) 
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

进入阻塞队列

上文中我们了解到,公平锁和非公平锁的 tryAcquire,未获取锁成功,则会调用 doAcquireNanos,进入锁等待队列。

 static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
               // 是否已超时
        if (nanosTimeout <= 0L)   
          // 超时返回false 表示锁获取失败
            return false; 
        final long deadline = System.nanoTime() + nanosTimeout;
                  // 独占模式 入队列
        final Node node = addWaiter(Node.EXCLUSIVE); 
        boolean failed = true;
        try {
          // 循环
            for (;;) { 
              // 获取节点的前驱节点
                final Node p = node.predecessor();
              // 如果前驱节点是Head则尝试再次获取锁 如果成功获取锁走if里面的逻辑
                if (p == head && tryAcquire(arg)) { 
                  // 将获取锁的节点设置为Head节点
                    setHead(node);
                  // help GC
                    p.next = null; 
                    failed = false;
                  // 成功获取锁,返回true
                    return true; 
                }
                nanosTimeout = deadline - System.nanoTime();
              // 如果超时,获取锁失败
                if (nanosTimeout <= 0L) 
                    return false;
              // 没超时继续锁的申请流程 判断是否需要挂起当前线程
              // 走到这里了要么当前node本来就不是队头,要么就是tryAcquire(arg)没有抢赢别人     
                if (shouldParkAfterFailedAcquire(p, node) && 
                    // 获取锁的剩余时间是否大于spinForTimeoutThreshold 如果是则通过自旋方式再次执行获取锁相关的逻辑。,否则将使线程阻塞
                    // spinForTimeoutThreshold 默认为 1s,避免线程切换带来的额外开销,因为如果本次锁申请距超时还剩不到 1s,就没有必要再阻塞线程,
                    nanosTimeout > spinForTimeoutThreshold) 
                  // 带超时时间的线程阻塞
                    LockSupport.parkNanos(this, nanosTimeout); 
              // 否则检查线程的中断标记,是否被中断
                if (Thread.interrupted())
                  // 中断,抛出中断异常,获取锁失败,返回false
                    throw new InterruptedException(); 
            }
        } finally {
            if (failed)
              // 将阻塞队列中对应节点的状态设置为CANCEL
                cancelAcquire(node);
        }
    }

上述 boolean doAcquireNanos(int arg, long nanosTimeout) 代码实际做了这么几件事

(1)判断获取锁是不是已经超时,如果是,返回 false。

(2)以独占模式调用 addWaiter 加入阻塞队列(下文会详细解释)。

(3)获取节点的前驱节点,如果节点的前驱节点为头节点,再次调用 tryAcquire 方法尝试获取锁。

(4)如果成功获取锁,则将当前节点设置为 Head,表示当前它是锁的持有者。

 private void setHead(Node node) {
        head = node;
        node.thread = null; 
        node.prev = null;
    }

这里你可能会有疑惑,为什么头节点持有的线程对象是空呢?

也就是说头节点是锁的持有者,但实际上头节点却并不维护线程对象。

因为锁的持有者被记录在了 AQS 的 Thread exclusiveOwnerThread 属性中。

这样做的好处是:在实现非公平锁时,如果锁被新线程抢占,不需要更新头节点。

(5)如果当前节点不是头节点,或者没有成功获取锁,调用 shouldParkAfterFailedAcquire 判断当前线程是否需要阻塞,如果需要阻塞,则调用 LockSupport.parkNanos 阻塞线程。

简单说明下,如果前驱节点的状态为 SINGNAL = -1,则表示当前线程直接进入阻塞队列,排队获取锁。如果前驱节点的状态 > 0 时,对应状态 CANCELLED = 1,说明是已取消的节点,都需要被删除(这里运用了循环将当前节点前面状态 CANCELLED = 1 的连续取消节点删除,直到遇到非取消节点立即停止删除,类似分段思想,在多线程的场景,当前线程为段尾从后向前方向删除连续取消节点,并不影响后续其他线程操作节点,所以是线程安全的)

​ 前驱节点的 waitStatus 不等于 -1 和 1,那也就是只可能是 0,-2,-3,对于 ReentrantLock 来说只可能是 0,如果前驱节点的状态为 0 也就是初始状态,那么需要将前驱节点的状态更新为 SIGNAL = -1,

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

不阻塞线程,而是外层 doAcquireNanos 通过自旋方式再次执行获取锁相关的逻辑。

 // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点    
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  // 获取前一个节点的等待状态
        int ws = pred.waitStatus; 
  // 说明前驱节点状态正常,则当前线程直接进入到阻塞队列,当前线程需要挂起,等待以后被唤醒
        if (ws == Node.SIGNAL) 
            // 表明当前线程可以被park,安全的阻塞等待。
            return true;
        if (ws > 0) {
          // (进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的
            do { 
              // 如果前驱节点的状态大于 0 表明该线程的前驱节点已经等待超时或者取消或者被中断了,则需要从CLH队列中将该前一个节点删除掉,循环回溯,直到前一个节点状态 <= 0 遇到非取消节点立即停止删除 (循环将前面取消的节点都出队)
                node.prev = pred = pred.prev; 
              //  waitStatus > 0 说明前驱节点取消了排队
            } while (pred.waitStatus > 0); 
            pred.next = node;
        } 
  // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
  else { 
          // 通常情况下,每个新的node入队时,waitStatus都是0,前驱节点是之前的tail,那么它的waitStatus应该是0 
    // 每个node在入队的时候,都会把前驱节点的状态改为SIGNAL(-1),不阻塞线程,而是再次试图获取锁相关的逻辑。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
        }
    // 这个方法返回 false,那么会再走一次 for 循序,然后再次进来此方法,此时会从第一个分支 ws==Node.SIGNAL 返回 true  
        return false;
    }

(6)否则检查线程的中断标记,是否被中断。

(7)最后将阻塞队列中对应节点的状态设置为 CANCEL


接着来详细看下 addWaiter 方法是如何处理的

tail 节点刚刚我们已经分析过了它是 CLH 双向链表构成的阻塞队列的尾结点,addWaiter 流程简单来说其实就是将线程包装成 Node 节点,方便后续对线程进行阻塞或唤醒,并将 Node 节点添加至 CLH 阻塞队列的最后面。

这里需要注意的是链表的操作在多线程环境中是不安全的,所以这里引入了 CAS 机制,通过 if (compareAndSetTail(pred, node)) 来将自己设置为队尾,更新成功则结束,否则 enq(node) 无限重试直到更新成功。

    private Node addWaiter(Node mode) {
        // 创建一个 Node 节点,将线程对象存储在 Node 节点中
        Node node = new Node(Thread.currentThread(), mode); 
        // 记录原尾节点
        Node pred = tail; 
          // 说明有线程已经在该锁上等待
        if (pred != null) { 
          // 添加新节点到队列尾部
            node.prev = pred; 
              // CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
            if (compareAndSetTail(pred, node)) { 
              // cas成功,原尾结点的下一个节点为新节点
                pred.next = node; 
              // 线程入队了,返回节点
                return node; 
            }
        }
       // 直到 enq 方法的场景等待队列为空,或者cas失败有线程竞争入队。那么自旋入阻塞队列 则重试直到成功
        enq(node); 
        return node;
    }
   private Node enq(final Node node) {
             // 无限重试直到成功
        for (;;) { 
           // 记录原尾节点
            Node t = tail;
           // 队列为空则重新创建为空头尾结点
            if (t == null) {
              // 此时Node waitstatus为0
                if (compareAndSetHead(new Node()))
                  // tail指向head
                    tail = head; 
            } 
          // 将当前线程排到队尾,有线程竞争的话排不上重试
          else { 
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

最后如果 doAcquireNanos 获取锁失败则执行 cancelAcquire 将阻塞队列中对应节点的状态设置为CANCEL

finally {
            if (failed)
                  // 将阻塞队列中对应节点的状态设置为CANCEL
                cancelAcquire(node);
        }
 private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;
     
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

          Node predNext = pred.next;

           node.waitStatus = Node.CANCELLED;

                   // 如果node是尾节点,CAS设置为新的尾结点
         if (node == tail && compareAndSetTail(node, pred)) { 
            // 成功则cas设置pred的下一个节点为null置空
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            }
           // pred为首节点
           else { 
             // 唤醒 node 的下一个节点的线程等待
                unparkSuccessor(node); 
            }
                    // help GC
            node.next = node; 
        }
    }

至此 RenntrantLock tryLock 尝试获取锁方法流程结束

这里笔者画了个流程图,帮助大家理解

这里是流程图的链接,点这里


同理其实 RenntrantLock Lock 逻辑类似,对应方法上文都已经解读过,这里不在重复说明了。

public final void acquire(int arg) {
              // 尝试获取同步状态,获取成功设置锁状态并返回true,执行业务逻辑,否则获取失败返回false,需子类自己实现,获取成功就不需要进队列排队
        if (!tryAcquire(arg) && 
            // 生成Node节点,需要把当前线程挂起。加入到CLH阻塞队列,
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
          // 获取到锁,中断线程
            selfInterrupt(); 
    }

如果 tryAcquire(int arg) 方法返回 false ,即获取同步状态失败,则调用 addWaiter(Node mode) 方法,将当前线程加入到 CLH 同步队列尾部。并且, mode 方法参数为 Node.EXCLUSIVE ,表示独占模式。

调用 boolean #acquireQueued(Node node, int arg) 方法,自旋直到获得同步状态成功。当返回 true 时,表示在这个过程中,发生过线程中断

也就是满足 parkAndCheckInterrupt 条件(下文会详细介绍这里),设置了 interrupted 标识为 true

  if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

parkAndCheckInterrupt 中返回了 Thread.interrupted() 这个方法会返回当前线程的中断状态,但是这个方法会清除线程中断的标记

所以在种情况下,外层 acquire 流程中会执行 selfInterrupt() 方法,恢复线程中断的标记

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 恢复线程中断的标记
            selfInterrupt();
    }

下面代码为一个自旋的过程,当前线程节点进入同步队列后,就会进入一个自旋的过程,每个节点都会自省观察,当条件满足,获取到锁后,就可以从这个自旋过程中退出,否则会一直执行下去。

   // acquireQueued方法可以对排队中的线程进行获锁操作。
final boolean acquireQueued(final Node node, int arg) { 
  // 标记是否成功拿到资源
        boolean failed = true; 
        try {
          // 记录过程中,是否发生了线程中断
            boolean interrupted = false; 
           // 开始自旋,要么获取锁,要么中断
            for (;;) { 
              // 获取当前节点的前驱节点
                final Node p = node.predecessor(); 
               // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,当前节点在真实数据队列的首部,因为它的前驱是head。注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列,所以当前节点可以去试抢一下锁
                if (p == head && tryAcquire(arg)) {
                  // 获取锁成功,设置当前节点为新的头节点
                    setHead(node); 
                  // 设置老的头节点p不在指向下一个节点,加速gc
                    p.next = null; 
                   // 获取同步状态成功
                    failed = false;
                  // 返回记录状态
                    return interrupted; 
                }
               // 走到这里了要么当前node本来就不是队头,要么就是tryAcquire(arg)没有抢赢别人 需要判断是否需要挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) && 
                    // 负责挂起线程
                    parkAndCheckInterrupt()) 
                   // 不响应中断 如果等待过程中被中断过就将interrupted标记置为true
                    interrupted = true;
            }
        } finally {
          // 获取同步状态发生异常,取消获取
            if (failed) 
                cancelAcquire(node);
        }
    }

parkAndCheckInterrupt() 的作用是阻塞当前线程,并且返回线程被唤醒之后的中断状态。
它会先通过 LockSupport.park() 阻塞当前线程。当前驱节点对应的线程使用完锁之后,通过 unpark() 方式唤醒当前线程时,阻塞线程会在此处被唤醒。

private final boolean parkAndCheckInterrupt() {
  // 挂起,进入阻塞等待唤醒的状态,等待被唤醒
        LockSupport.park(this); 
  // 在线程被唤醒时,被唤醒后回到 if (p == head && tryAcquire(arg)) 
  // Thread.interrupted 返回当前线程是否被中断,并清除中断标记位。所以当该方法返回true(被中断)时,需要手动补偿中断标记位。
        return Thread.interrupted(); 
    }

结合刚刚分析的 acquireQueued() 流程。当前线程被中断过,则执行selfInterrupt(); ,否则不会执行。

也就是说在 acquireQueued() 流程中,即使是线程在阻塞状态被中断唤醒而获取到 cpu 执行权利,但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。该线程会再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒,线程被唤醒后会继续尝试获取锁,如果获取锁成功时 acquireQueued() 流程会返回 true。

但在该线程成功获取锁成功之前,它的中断会被忽略并且中断标记会被清除。

这是因为在 parkAndCheckInterrupt() 中,我们线程的中断时调用了 Thread.interrupted()

该函数不同于 Thread 的 isInterrupted() 函数,isInterrupted() 仅仅返回中断状态,而interrupted() 在返回当前中断状态之后,还会清除中断标记。

正因为之前的中断标记被清除了,所以这里需要调用 selfInterrupt() 重新产生一个中断。

如果你对 interrupt() 和 interrupted() 的使用还有疑惑,可以参考这篇文章

interrupt() VS interrupted() VS interrupted()》)

非公平锁

   final void lock() {
             // CAS设置state成功(获取锁成功)将当前线程设置为独占线程
            if (compareAndSetState(0, 1)) 
                setExclusiveOwnerThread(Thread.currentThread());
     // CAS设置state失败(获取锁失败)调用AQS#acquire(1)方法获取同步状态
            else 
                acquire(1);
        }

公平锁

 final void lock() {
               // 调用AQS#acquire(1)方法获取同步状态
            acquire(1);
        }

这里笔者也画了个流程图,帮助大家理解

这里是Lock流程图的链接,点这里

2.2 释放锁

上文对申请锁流程做了解读,下面我们来看 RenntrantLock 是如何释放锁的。

RenntrantLock 的 unlock 方法调用了 sync.release(1);

public void unlock() {
        sync.release(1);
    }

下述代码是 AQS 中 relsease 的实现

   public final boolean release(int arg) {
     // 尝试释放锁
        if (tryRelease(arg)) {  
          // 释放成功则获取头节点Head
            Node h = head; 
          // 如果头节点不为空且waitStatus不为0,说明需要唤醒第一个未取消的后继节点
            if (h != null && h.waitStatus != 0) 
              // 唤醒第一个未取消的后继节点
                unparkSuccessor(h); 
          // 否则释放锁成功
            return true; 
        }
        return false;
    }

relsease 方法它实际上做了这么几件事

(1)尝试释放锁

RenntrantLock 实现了 tryRelease 方法

首先判断了当前线程是不是锁的持有者,如果不是则抛出异常,否则判断锁剩余占有次数,如果为 0,则说明锁已释放,需要唤醒阻塞队列中其它排队线程。

这里需要注意:如果一个锁被同一个线程重入 n 次,那对应也要释放 n 次。当持有次数为 0 时,表示可以释放锁

 protected final boolean tryRelease(int releases) {
                       // 用当前所的重入次数减去本次释放的次数
            int c = getState() - releases; 
                        // 当前线程是不是锁的持有者
            if (Thread.currentThread() != getExclusiveOwnerThread())
              // 不是则异常
                throw new IllegalMonitorStateException();
                   // 当前线程是锁的持有者,是否完全释放锁
            boolean free = false; 
   // c==0,代表完全释放(如果一个锁被同一个线程重入 n 次,那对应也要释放 n 次。当持有次数为 0 时,表示可以释放锁),其他线程可以获取同步状态了
            if (c == 0) { 
              // 标记是否成功
                free = true; 
              // 并将锁的占用者置空
                setExclusiveOwnerThread(null); 
            }
                   // 更新锁的状态(持有次数)
            setState(c);
            return free;
        

(2)tryRelease(arg) 尝试获取锁成功,需要唤醒阻塞队列中下一个线程

这里需要注意,如果使用的是非公平锁,新的线程此时可以直接获取到锁,插队,唤醒的线程没获取到锁就需要进入阻塞队列中排队。

这里的判断条件为什么是h != null && h.waitStatus != 0 呢?

  • h null Head还没初始化。初始情况下,head null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。

  • h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。

  • h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

从阻塞队列中查找下一个待唤醒的线程对应的是 unparkSuccessor 方法

// 入参是头节点  
private void unparkSuccessor(Node node) {
    // 当前节点状态
        int ws = node.waitStatus; 
    // 如果head节点当前waitStatus<0, CAS将其修改为0
        if (ws < 0) 
            compareAndSetWaitStatus(node, ws, 0);

    // 当前节点后继节点
        Node s = node.next; 
   //  唤醒后继节点,从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
    // 后继节点为空或者状态>0 (超时或中断)
        if (s == null || s.waitStatus > 0) { 
            s = null;
          // 从链表的尾部开始寻找,找到头节点后面的第一个非取消节点。
            for (Node t = tail; t != null && t != node; t = t.prev) 
               // 从tail寻找可用节点
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
          // 唤醒节点线程, 线程被唤醒后会继续去竞争锁。
            LockSupport.unpark(s.thread); 
    }

上述 void unparkSuccessor(Node node) 方法做了这么几件事

(1)确认头节点的状态。如果头节点的状态不为 0,则更新为 0

(2)入参是头节点,在释放锁时,需要找到头节点下一个未取消的节点

(3)从链表的尾部开始寻找,找到头节点后面的第一个非取消节点。

这里说明一下,因为我们在维护节点的前驱节点时使用了 CAS,通过前驱节点遍历是可靠的,不会遗漏节点。

这里你可能会有疑问为什么是从 tail 尾节点开始寻找,而不是从 node.next 开始呢?

之所以从尾部开始遍历,是为了能保证遍历到所有的节点。

比如,当前 node 是前 tail 节点,新的 node2 正在变成 tail 节点,但是 addWaiter pred.next = node; 并不是原子操作,很可能这步还未来得及执行,如果正向遍历,node.next,会为null,就会遗漏新加入的节点,但是从 tail 开始遍历肯定没问题,因为在设置 tail 节点时,compareAndSetTail(pred, node) 是原子操作

(4)找到对应的节点,调用 LockSupport.unpark 方法唤醒线程。线程被唤醒后会继续去竞争锁。这里唤醒的是申请锁时用 LockSupport.park 阻塞的线程,因为这样可以让锁的申请和释放形成闭环通道。

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 刚刚线程被挂起在这里了
    return Thread.interrupted();
}

之后会走到这个逻辑 acquireQueued(final Node node, int arg),这个时候,node的前驱是head。

这里笔者也画了个流程图,帮助大家理解

这里是解锁流程图的链接,点这里

3. 小结

简单总结下

其实在 AQS 中维护着一个 CLH 的阻塞队列

  • 当线程获取同步状态失败后,则会加入到 CLH 阻塞队列的队尾,并一直保持着自旋。

  • 在 CLH 阻塞队列中的线程在自旋时,会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出 CLH 阻塞队列。

  • 当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。

4. 参考

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

0

评论区