侧边栏壁纸
博主头像
灬编程小子博主等级

写程序就是一个不断追求完美的过程

  • 累计撰写 4 篇文章
  • 累计创建 1 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录
JUC

AQS独占模式实现原理

灬编程小子
2023-03-10 / 0 评论 / 0 点赞 / 77 阅读 / 28902 字

0. 序

在 Java 中锁的使用有两种方式,一种是通过 JVM 虚拟机层面提供的 synchronized 关键字,另一种则是使用 JUC 框架,而 AQS(全称 AbstractQueuedSynchronizer)是 JUC 大部分同步组件(例如赫赫有名的 RenntrantLockSemaphore)的基础。

AQS 提供了原子式管理同步状态,阻塞和唤醒线程功能以及队列模型的框架,它非常之常用和强大,其实比较简单,但对于初学者来说,理解起来会有些许难度。

我们都知道 RenntrantLock 是一种独占锁,所以本文会通过 ReentrantLock 为切入点,来了解 AQS 是如何被使用的,AQS 的独占模式是如何实现的,接着通过解读源码来剖析 AQS 的实现原理,深入 AQS,吃透 AQS。

1. 前置知识

文章开篇让我们先掌握几个前置知识点,帮助理解下文

公平锁 VS 非公平锁

举个生活中的例子,食堂内排队买饭,人们秩序有条,一个接一个的排队。

非公平顾名思义,你排的好好的,突然有个人来插你的队,排到了你前面,这就不公平了。

也就是说,线程 A,线程 B 在 AQS 队列中排队,如果线程 C 老老实实的排队,就是公平锁,如果线程 A

锁释放,线程 C 抢到锁时机在线程 B 出队过程中,线程 A 唤醒线程 B 之前,线程 C 就排到了线程 B 的前面,插队了,就是非公平锁。

重入锁 VS 不可重入锁

重入就是说是否同一个线程支持反复加锁释放锁,同一线程加锁几次就需要释放锁几次。

独占模式锁 VS 共享模式锁

所谓独占锁,就是只要有一个线程加锁,其他人都得干等着,这把锁属于某个线程独占,这就是独占锁。

所谓共享锁,就是可以和别的线程同时持有一把锁,比如读写锁,线程 A 加了读锁,线程 B 还是可以加读锁的,它们共享一把锁,这就是共享锁。

AQS类描述

AbstractQueuedSynchronizer 是 AQS 体系的核心基类(抽象类),使用了模版设计模式,它定义了锁的基本行为,实现了锁的基本存储结构,下面我们来一起了解下。

AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer,此类比较简单,可以看到 transient 修饰的独占线程字段并提供了 set,get 方法获取,用于记录锁当前的持有者线程。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
<pre><code>private static final long serialVersionUID = 3737899427754241961L;

protected AbstractOwnableSynchronizer() { }

  // 独占线程
private transient Thread exclusiveOwnerThread; 

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
}

}

回到 AbstractQueuedSynchronizer 类,来看其内部结构,它使用了是类模版设计模式,实现了锁的基本存储结构,定义了锁的行为。

State

它定义了字段 state,代表当前锁的状态,并用 volatile 关键字修饰,来应对并发场景

private volatile int state; 

这里先说明下,state 为 0 代表当前锁并未被任何线程占用,state 大于 0 则代表有线程持有当前锁,其中 state 大于 1 表示重入。

提供了针对 state 的三个方法,其中值得一提的是 compareAndSetState 方法,通过 CAS 方式更新 state,来保证原子性。

 // 获取state<br />
protected final int getState() {
return state;
}</p>
<p>// 设置state
protected final void setState(int newState) {
state = newState;
}</p>
<p>// CAS更新state 该方法能够保证状态设置的原子性
protected final boolean compareAndSetState(int expect, int update) {<br />
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 加锁和释放锁达到同步状态实际上就是通过 CAS 对 State 值进行修改达到。

这里简单带过,后续会做讲解。

CAS 是什么

要知道 volatile 只能保证可见性,无法保证原子性,查询并更新 state 的值并不是一个原子性操作,在多线程环境中并不安全。

而操作系统提供了一个新的 CPU 指令(CAS),CompareAndSwap,即比较并替换。具体的原理是在更新一个值之前,首先比较这个值是否发生了变化,如果确实发生了变化,那么就会更新失败,否则更新成功

AQS 的 CLH

接着继续来看 AQS 内部结构,你会发现这么两个字段,结构同为 Node 的 head 和 tail,见名知意,一个头节点,一个尾节点。


private transient volatile Node head;</p>
<p>private transient volatile Node tail;

Node 结构的定义如下:

可以看到 Node 中封装了线程 thread,并维护了前驱节点 prev,和后继节点 next,从而构造了一个 FIFO 的双向队列,同时定义了 waitStatus 表示当前节点在队列中的等待状态并提供了 waitStatus 的一些可能值的定义。

其实这里就大概能看出,AQS 是通过将每个请求共享资源的线程封装到一个 Node 节点中,通过 waitStatus 状态来实现锁的分配,prev,next 用于构建同步等待队列(阻塞队列),而 nextWatier 用于构建条件等待队列,这里需要注意的是阻塞队列是不包含 head 的。

// 双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。<br />
static final class Node {</p>
<pre><code>    // 节点当前以共享模式等待锁
    static final Node SHARED = new Node(); 
    
         // 节点当前以独占模式等待锁
    static final Node EXCLUSIVE = null; 

         // waitStatus状态为1,表示当前线程取消了争抢这个锁
    static final int CANCELLED =  1; 
   
         // waitStatus状态为-1,其表示当前node的后继节点需要被唤醒。也就是说这个waitStatus其实代表的不是自己的状态,而是后继节点的状态
    static final int SIGNAL    = -1; 
 
    // waitStatus状态为-2,表示当前节点在等待队列中,等待唤醒
    static final int CONDITION = -2;
  
          // waitStatus状态为-3,共享模式狭隘释放锁才会使用
    static final int PROPAGATE = -3; 

          // 当前节点在队列中的等待状态
    volatile int waitStatus; 
   
          // 前驱指针,当节点添加到同步队列时被设置(尾部添加)
    volatile Node prev; 
      
          // 后继指针
    volatile Node next; 

          // 当前处于该节点的线程
    volatile Thread thread; 

          // 指向下一个处于Condition状态的节点
    Node nextWaiter; 

}

这里笔者画了个图帮助各位小伙伴理解一下

AQS-独占-1.png

看了上图之后,你也许很好奇线程 t1 所在的节点的前驱节点为什么是空节点呢,它的作用是什么,请先保留疑问,带着疑问,继续往下看。

AQS 方法定义

翻看 AQS 类代码的过程中,你可能会发现 AQS 作为被 abstract 修饰的抽象类,却没有抽象方法,而是单纯用 protected 修饰,这是为什么呢?

要知道上文我们知道 AQS 是 JUC 大部分同步组件的基类,它并不建议你直接 new AQS 来使用,且如果方法修饰了 abstract 则需要每个子类去实现,这是不合理的,所以它让子类按需实现。

而 AQS 中被 public 修饰的方法可以认定为模版方法,不建议子类覆盖实现。

下面来看下,AQS 被使用最多的方法定义:

方法名

描述

protected boolean isHeldExclusively()

该线程是否正在独占资源。只有用到 Condition 才需要去实现它。

protected boolean tryAcquire(int arg)

独占方式。arg 为获取锁的次数,尝试获取资源,成功则返回 true,失败则返回 false。

protected boolean tryRelease(int arg)

独占方式。arg 为释放锁的次数,尝试释放资源,成功则返回 true,失败则返回 false。

protected int tryAcquireShared(int arg)

共享方式。arg 为获取锁的次数,尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

protected boolean tryReleaseShared(int arg)

共享方式。arg 为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。

上述表格你会发现,tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 分别 2 对,一对实现独占,一对实现共享。也就是说 AQS 支持自定义同步组件同时实现独占和共享两种方式,非常强大。

这里你只需要大概清楚上表中每个方法的作用即可,后续会做详细讲解。

相信我们大家或多或少都有使用过 ReentrantLock,Semaphore,两者均为基于 AQS 的实现,

  • ReentrantLock实现了 AQS 的独占模式

  • Semaphore实现了 AQS 的共享模式

下面我们将通过解读这俩常用组件源码,来剖析 AQS 的原理。

2. ReentrantLock

ReentrantLock 实现了 AQS 的独占模式,它是一个可重入锁,同时提供了公平锁非公平锁的实现。

来看结构,由于 Java 不支持多继承,所以由 ReentrantLock 继承抽象类 Lock,用内部类的方式继承 AQS,ReentrantLock 在具体实现锁时基本都是委托内部类 Sync,而 Sync 又继承自 AQS。Sync 内部有两个子类,分别是 FairSync(公平锁)与 NonfairSync(非公平锁)。

AQS-独占-2.png

2.1 申请锁

来看加锁流程,首先来看 ReentrantLock 支持超时时间的锁申请

  public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

调用 Sync 的 tryAcquireNanos 方法

// 独占模式,忽略中断
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果线程的中断位标记为 true,表示应用方主动放弃锁的申请,可以直接抛出中断异常,结束锁的申请
if (Thread.interrupted())
throw new InterruptedException();
// 调用 Sync 的 tryAcquire 尝试获取锁。如果返回 true,表示成功获取锁,可以直接返回;不然就调用 doAcquireNanos,进入锁等待队列
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

tryAcquire 方法在具体的子类中实现

该方法返回 true 则表示获取到同步状态锁

  1. 没有线程在等待锁

  2. 重入锁,线程本来就持有锁

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

先来看 ReentrantLock 公平锁的实现,它在 FairSync 类中。

公平锁实现

  protected final boolean tryAcquire(int acquires) {
// 确保获取当前申请锁的线程
final Thread current = Thread.currentThread();
// state 字段的含义是当前锁的重入次数
int c = getState();
//如果 state 为 0,表示当前锁并没有被占用 说明此时此刻锁是可以用的
if (c == 0) {
// 判断阻塞队列中有没有其他线程在排队
if (!hasQueuedPredecessors() &&
// CAS获取锁,失败说明刚刚同一时刻其他线程抢先了 因为刚刚判断 c == 0 证明没有锁被占用
compareAndSetState(0, acquires)) {
// 获取到锁,标记独占模式,占有锁
setExclusiveOwnerThread(current);
return true;
}
}
// 锁重入场景 不存在并发问题
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

首先获取当前线程,确保获取到当前申请锁的线程,

接着获取锁的当前状态,state,也就是我们上文提到的 state 为 0 代表当前锁并未被任何线程占用,state 大于 0 则代表有线程持有当前锁,其中 state 大于 1 表示重入。

上述代码流程中,如果 state 不为 0,表示锁已经被占用,但因为 ReentrantLock 支持可重入性,所以如果当前线程是锁的持有者,则只需要更新 state 的值,而不用 CAS 更新,否则就返回 false,这个线程最终会调用 doAcquireNanos,进入到同步阻塞队列,排队获取锁。

上述代码流程中,如果 state 为 0 则说明当前锁并没有被占用,则需要判断阻塞队列中有没有其他线程在排队,如果有,公平锁此时无法竞争锁,返回 false,尝试获取锁失败。这个线程最终会调用 doAcquireNanos,进入到同步阻塞队列,排队获取锁。

阻塞队列中没有其他排队的其他线程,则进行 CAS 获取锁,如果获取锁成功,则标记独占模式,占有锁。

来详细看下 hasQueuedPredecessors 方法的实现

实际上它做了这么一件事,判断当前线程是否位于 CLH 阻塞队列的第一个,是则返回true,否则返回false

  public final boolean hasQueuedPredecessors() {
// 尾结点
Node t = tail;
// 头节点
Node h = head;
Node s;
// 头节点 != 尾结点 且 同步队列第一个节点不为 null 且 当前线程不是阻塞队列第一个节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

非公平锁实现

来看 tryAcquire 非公平锁的实现,它在 NonfairSync 类中。

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires); // 非公平锁获取锁
}

其实我们对比公平锁,可以发现代码几乎一致

不同点在于,非公平锁会首先和阻塞队列中的线程竞争,如果竞争成功,可以直接获取锁。

 final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 和公平锁的不同点
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

进入阻塞队列

上文中我们了解到,公平锁和非公平锁的 tryAcquire,未获取锁成功,则会调用 doAcquireNanos,进入锁等待队列。

 static final long spinForTimeoutThreshold = 1000L;</p>
<p>private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 是否已超时
if (nanosTimeout <= 0L)<br />
// 超时返回false 表示锁获取失败
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 独占模式 入队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 循环
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是Head则尝试再次获取锁 如果成功获取锁走if里面的逻辑
if (p == head && tryAcquire(arg)) {
// 将获取锁的节点设置为Head节点
setHead(node);
// help GC
p.next = null;
failed = false;
// 成功获取锁,返回true
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果超时,获取锁失败
if (nanosTimeout <= 0L)
return false;
// 没超时继续锁的申请流程 判断是否需要挂起当前线程
// 走到这里了要么当前node本来就不是队头,要么就是tryAcquire(arg)没有抢赢别人<br />
if (shouldParkAfterFailedAcquire(p, node) &&
// 获取锁的剩余时间是否大于spinForTimeoutThreshold 如果是则通过自旋方式再次执行获取锁相关的逻辑。,否则将使线程阻塞
// spinForTimeoutThreshold 默认为 1s,避免线程切换带来的额外开销,因为如果本次锁申请距超时还剩不到 1s,就没有必要再阻塞线程,
nanosTimeout > spinForTimeoutThreshold)
// 带超时时间的线程阻塞
LockSupport.parkNanos(this, nanosTimeout);
// 否则检查线程的中断标记,是否被中断
if (Thread.interrupted())
// 中断,抛出中断异常,获取锁失败,返回false
throw new InterruptedException();
}
} finally {
if (failed)
// 将阻塞队列中对应节点的状态设置为CANCEL
cancelAcquire(node);
}
}

上述 boolean doAcquireNanos(int arg, long nanosTimeout) 代码实际做了这么几件事

(1)判断获取锁是不是已经超时,如果是,返回 false。

(2)以独占模式调用 addWaiter 加入阻塞队列(下文会详细解释)。

(3)获取节点的前驱节点,如果节点的前驱节点为头节点,再次调用 tryAcquire 方法尝试获取锁。

(4)如果成功获取锁,则将当前节点设置为 Head,表示当前它是锁的持有者。

 private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

这里你可能会有疑惑,为什么头节点持有的线程对象是空呢?

也就是说头节点是锁的持有者,但实际上头节点却并不维护线程对象。

因为锁的持有者被记录在了 AQS 的 Thread exclusiveOwnerThread 属性中。

这样做的好处是:在实现非公平锁时,如果锁被新线程抢占,不需要更新头节点。

(5)如果当前节点不是头节点,或者没有成功获取锁,调用 shouldParkAfterFailedAcquire 判断当前线程是否需要阻塞,如果需要阻塞,则调用 LockSupport.parkNanos 阻塞线程。

简单说明下,如果前驱节点的状态为 SINGNAL = -1,则表示当前线程直接进入阻塞队列,排队获取锁。如果前驱节点的状态 > 0 时,对应状态 CANCELLED = 1,说明是已取消的节点,都需要被删除(这里运用了循环将当前节点前面状态 CANCELLED = 1 的连续取消节点删除,直到遇到非取消节点立即停止删除,类似分段思想,在多线程的场景,当前线程为段尾从后向前方向删除连续取消节点,并不影响后续其他线程操作节点,所以是线程安全的)

​ 前驱节点的 waitStatus 不等于 -1 和 1,那也就是只可能是 0,-2,-3,对于 ReentrantLock 来说只可能是 0,如果前驱节点的状态为 0 也就是初始状态,那么需要将前驱节点的状态更新为 SIGNAL = -1,

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

不阻塞线程,而是外层 doAcquireNanos 通过自旋方式再次执行获取锁相关的逻辑。

 // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点<br />
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前一个节点的等待状态
int ws = pred.waitStatus;
// 说明前驱节点状态正常,则当前线程直接进入到阻塞队列,当前线程需要挂起,等待以后被唤醒
if (ws == Node.SIGNAL)
// 表明当前线程可以被park,安全的阻塞等待。
return true;
if (ws > 0) {
// (进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的
do {
// 如果前驱节点的状态大于 0 表明该线程的前驱节点已经等待超时或者取消或者被中断了,则需要从CLH队列中将该前一个节点删除掉,循环回溯,直到前一个节点状态 <= 0 遇到非取消节点立即停止删除 (循环将前面取消的节点都出队)
node.prev = pred = pred.prev;
//  waitStatus > 0 说明前驱节点取消了排队
} while (pred.waitStatus > 0);
pred.next = node;
}
// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
else {
// 通常情况下,每个新的node入队时,waitStatus都是0,前驱节点是之前的tail,那么它的waitStatus应该是0
// 每个node在入队的时候,都会把前驱节点的状态改为SIGNAL(-1),不阻塞线程,而是再次试图获取锁相关的逻辑。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 这个方法返回 false,那么会再走一次 for 循序,然后再次进来此方法,此时会从第一个分支 ws==Node.SIGNAL 返回 true<br />
return false;
}

(6)否则检查线程的中断标记,是否被中断。

(7)最后将阻塞队列中对应节点的状态设置为 CANCEL


接着来详细看下 addWaiter 方法是如何处理的

tail 节点刚刚我们已经分析过了它是 CLH 双向链表构成的阻塞队列的尾结点,addWaiter 流程简单来说其实就是将线程包装成 Node 节点,方便后续对线程进行阻塞或唤醒,并将 Node 节点添加至 CLH 阻塞队列的最后面。

这里需要注意的是链表的操作在多线程环境中是不安全的,所以这里引入了 CAS 机制,通过 if (compareAndSetTail(pred, node)) 来将自己设置为队尾,更新成功则结束,否则 enq(node) 无限重试直到更新成功。

    private Node addWaiter(Node mode) {
// 创建一个 Node 节点,将线程对象存储在 Node 节点中
Node node = new Node(Thread.currentThread(), mode);
// 记录原尾节点
Node pred = tail;
// 说明有线程已经在该锁上等待
if (pred != null) {
// 添加新节点到队列尾部
node.prev = pred;
// CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
if (compareAndSetTail(pred, node)) {
// cas成功,原尾结点的下一个节点为新节点
pred.next = node;
// 线程入队了,返回节点
return node;
}
}
// 直到 enq 方法的场景等待队列为空,或者cas失败有线程竞争入队。那么自旋入阻塞队列 则重试直到成功
enq(node);
return node;
}
   private Node enq(final Node node) {
// 无限重试直到成功
for (;;) {
// 记录原尾节点
Node t = tail;
// 队列为空则重新创建为空头尾结点
if (t == null) {
// 此时Node waitstatus为0
if (compareAndSetHead(new Node()))
// tail指向head
tail = head;
}
// 将当前线程排到队尾,有线程竞争的话排不上重试
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

最后如果 doAcquireNanos 获取锁失败则执行 cancelAcquire 将阻塞队列中对应节点的状态设置为CANCEL

finally {
if (failed)
// 将阻塞队列中对应节点的状态设置为CANCEL
cancelAcquire(node);
}
 private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;</p>
<pre><code>    node.thread = null;
 
    Node pred = node.prev;
    while (pred.waitStatus &gt; 0)
        node.prev = pred = pred.prev;

      Node predNext = pred.next;

       node.waitStatus = Node.CANCELLED;

               // 如果node是尾节点,CAS设置为新的尾结点
     if (node == tail &amp;&amp; compareAndSetTail(node, pred)) { 
        // 成功则cas设置pred的下一个节点为null置空
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &amp;&amp;
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws &lt;= 0 &amp;&amp; compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &amp;&amp;
            pred.thread != null) {
            Node next = node.next;
            if (next != null &amp;&amp; next.waitStatus &lt;= 0)
                compareAndSetNext(pred, predNext, next);
        }
       // pred为首节点
       else { 
         // 唤醒 node 的下一个节点的线程等待
            unparkSuccessor(node); 
        }
                // help GC
        node.next = node; 
    }
}

至此 RenntrantLock tryLock 尝试获取锁方法流程结束

这里笔者画了个流程图,帮助大家理解

这里是流程图的链接,点这里


同理其实 RenntrantLock Lock 逻辑类似,对应方法上文都已经解读过,这里不在重复说明了。

public final void acquire(int arg) {
// 尝试获取同步状态,获取成功设置锁状态并返回true,执行业务逻辑,否则获取失败返回false,需子类自己实现,获取成功就不需要进队列排队
if (!tryAcquire(arg) &&
// 生成Node节点,需要把当前线程挂起。加入到CLH阻塞队列,
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 获取到锁,中断线程
selfInterrupt();
}

如果 tryAcquire(int arg) 方法返回 false ,即获取同步状态失败,则调用 addWaiter(Node mode) 方法,将当前线程加入到 CLH 同步队列尾部。并且, mode 方法参数为 Node.EXCLUSIVE ,表示独占模式。

调用 boolean #acquireQueued(Node node, int arg) 方法,自旋直到获得同步状态成功。当返回 true 时,表示在这个过程中,发生过线程中断

也就是满足 parkAndCheckInterrupt 条件(下文会详细介绍这里),设置了 interrupted 标识为 true

  if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;

parkAndCheckInterrupt 中返回了 Thread.interrupted() 这个方法会返回当前线程的中断状态,但是这个方法会清除线程中断的标记

所以在种情况下,外层 acquire 流程中会执行 selfInterrupt() 方法,恢复线程中断的标记

 public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 恢复线程中断的标记
selfInterrupt();
}

下面代码为一个自旋的过程,当前线程节点进入同步队列后,就会进入一个自旋的过程,每个节点都会自省观察,当条件满足,获取到锁后,就可以从这个自旋过程中退出,否则会一直执行下去。

   // acquireQueued方法可以对排队中的线程进行获锁操作。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 记录过程中,是否发生了线程中断
boolean interrupted = false;
// 开始自旋,要么获取锁,要么中断
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,当前节点在真实数据队列的首部,因为它的前驱是head。注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列,所以当前节点可以去试抢一下锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,设置当前节点为新的头节点
setHead(node);
// 设置老的头节点p不在指向下一个节点,加速gc
p.next = null;
// 获取同步状态成功
failed = false;
// 返回记录状态
return interrupted;
}
// 走到这里了要么当前node本来就不是队头,要么就是tryAcquire(arg)没有抢赢别人 需要判断是否需要挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 负责挂起线程
parkAndCheckInterrupt())
// 不响应中断 如果等待过程中被中断过就将interrupted标记置为true
interrupted = true;
}
} finally {
// 获取同步状态发生异常,取消获取
if (failed)
cancelAcquire(node);
}
}

parkAndCheckInterrupt() 的作用是阻塞当前线程,并且返回线程被唤醒之后的中断状态。
它会先通过 LockSupport.park() 阻塞当前线程。当前驱节点对应的线程使用完锁之后,通过 unpark() 方式唤醒当前线程时,阻塞线程会在此处被唤醒。

private final boolean parkAndCheckInterrupt() {
// 挂起,进入阻塞等待唤醒的状态,等待被唤醒
LockSupport.park(this);
// 在线程被唤醒时,被唤醒后回到 if (p == head && tryAcquire(arg))
// Thread.interrupted 返回当前线程是否被中断,并清除中断标记位。所以当该方法返回true(被中断)时,需要手动补偿中断标记位。
return Thread.interrupted();
}

结合刚刚分析的 acquireQueued() 流程。当前线程被中断过,则执行selfInterrupt(); ,否则不会执行。

也就是说在 acquireQueued() 流程中,即使是线程在阻塞状态被中断唤醒而获取到 cpu 执行权利,但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。该线程会再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒,线程被唤醒后会继续尝试获取锁,如果获取锁成功时 acquireQueued() 流程会返回 true。

但在该线程成功获取锁成功之前,它的中断会被忽略并且中断标记会被清除。

这是因为在 parkAndCheckInterrupt() 中,我们线程的中断时调用了 Thread.interrupted()

该函数不同于 Thread 的 isInterrupted() 函数,isInterrupted() 仅仅返回中断状态,而interrupted() 在返回当前中断状态之后,还会清除中断标记。

正因为之前的中断标记被清除了,所以这里需要调用 selfInterrupt() 重新产生一个中断。

如果你对 interrupt() 和 interrupted() 的使用还有疑惑,可以参考这篇文章

interrupt() VS interrupted() VS interrupted()》)

非公平锁

   final void lock() {
// CAS设置state成功(获取锁成功)将当前线程设置为独占线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
// CAS设置state失败(获取锁失败)调用AQS#acquire(1)方法获取同步状态
else
acquire(1);
}

公平锁

 final void lock() {
// 调用AQS#acquire(1)方法获取同步状态
acquire(1);
}

这里笔者也画了个流程图,帮助大家理解

这里是Lock流程图的链接,点这里

2.2 释放锁

上文对申请锁流程做了解读,下面我们来看 RenntrantLock 是如何释放锁的。

RenntrantLock 的 unlock 方法调用了 sync.release(1);

public void unlock() {
sync.release(1);
}

下述代码是 AQS 中 relsease 的实现

   public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {<br />
// 释放成功则获取头节点Head
Node h = head;
// 如果头节点不为空且waitStatus不为0,说明需要唤醒第一个未取消的后继节点
if (h != null && h.waitStatus != 0)
// 唤醒第一个未取消的后继节点
unparkSuccessor(h);
// 否则释放锁成功
return true;
}
return false;
}

relsease 方法它实际上做了这么几件事

(1)尝试释放锁

RenntrantLock 实现了 tryRelease 方法

首先判断了当前线程是不是锁的持有者,如果不是则抛出异常,否则判断锁剩余占有次数,如果为 0,则说明锁已释放,需要唤醒阻塞队列中其它排队线程。

这里需要注意:如果一个锁被同一个线程重入 n 次,那对应也要释放 n 次。当持有次数为 0 时,表示可以释放锁

 protected final boolean tryRelease(int releases) {
// 用当前所的重入次数减去本次释放的次数
int c = getState() - releases;
// 当前线程是不是锁的持有者
if (Thread.currentThread() != getExclusiveOwnerThread())
// 不是则异常
throw new IllegalMonitorStateException();
// 当前线程是锁的持有者,是否完全释放锁
boolean free = false;
// c==0,代表完全释放(如果一个锁被同一个线程重入 n 次,那对应也要释放 n 次。当持有次数为 0 时,表示可以释放锁),其他线程可以获取同步状态了
if (c == 0) {
// 标记是否成功
free = true;
// 并将锁的占用者置空
setExclusiveOwnerThread(null);
}
// 更新锁的状态(持有次数)
setState(c);
return free;

(2)tryRelease(arg) 尝试获取锁成功,需要唤醒阻塞队列中下一个线程

这里需要注意,如果使用的是非公平锁,新的线程此时可以直接获取到锁,插队,唤醒的线程没获取到锁就需要进入阻塞队列中排队。

这里的判断条件为什么是h != null && h.waitStatus != 0 呢?

  • h null Head还没初始化。初始情况下,head null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。

  • h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。

  • h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。

从阻塞队列中查找下一个待唤醒的线程对应的是 unparkSuccessor 方法

// 入参是头节点<br />
private void unparkSuccessor(Node node) {
// 当前节点状态
int ws = node.waitStatus;
// 如果head节点当前waitStatus<0, CAS将其修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);</p>
<pre><code>// 当前节点后继节点
    Node s = node.next; 

// 唤醒后继节点,从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 // 后继节点为空或者状态>0 (超时或中断) if (s == null || s.waitStatus > 0) { s = null; // 从链表的尾部开始寻找,找到头节点后面的第一个非取消节点。 for (Node t = tail; t != null && t != node; t = t.prev) // 从tail寻找可用节点 if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒节点线程, 线程被唤醒后会继续去竞争锁。 LockSupport.unpark(s.thread); }

上述 void unparkSuccessor(Node node) 方法做了这么几件事

(1)确认头节点的状态。如果头节点的状态不为 0,则更新为 0

(2)入参是头节点,在释放锁时,需要找到头节点下一个未取消的节点

(3)从链表的尾部开始寻找,找到头节点后面的第一个非取消节点。

这里说明一下,因为我们在维护节点的前驱节点时使用了 CAS,通过前驱节点遍历是可靠的,不会遗漏节点。

这里你可能会有疑问为什么是从 tail 尾节点开始寻找,而不是从 node.next 开始呢?

之所以从尾部开始遍历,是为了能保证遍历到所有的节点。

比如,当前 node 是前 tail 节点,新的 node2 正在变成 tail 节点,但是 addWaiter pred.next = node; 并不是原子操作,很可能这步还未来得及执行,如果正向遍历,node.next,会为null,就会遗漏新加入的节点,但是从 tail 开始遍历肯定没问题,因为在设置 tail 节点时,compareAndSetTail(pred, node) 是原子操作

(4)找到对应的节点,调用 LockSupport.unpark 方法唤醒线程。线程被唤醒后会继续去竞争锁。这里唤醒的是申请锁时用 LockSupport.park 阻塞的线程,因为这样可以让锁的申请和释放形成闭环通道。

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 刚刚线程被挂起在这里了
return Thread.interrupted();
}

之后会走到这个逻辑 acquireQueued(final Node node, int arg),这个时候,node的前驱是head。

这里笔者也画了个流程图,帮助大家理解

这里是解锁流程图的链接,点这里

3. 小结

简单总结下

其实在 AQS 中维护着一个 CLH 的阻塞队列

  • 当线程获取同步状态失败后,则会加入到 CLH 阻塞队列的队尾,并一直保持着自旋。

  • 在 CLH 阻塞队列中的线程在自旋时,会判断其前驱节点是否为首节点,如果为首节点则不断尝试获取同步状态,获取成功则退出 CLH 阻塞队列。

  • 当线程执行完逻辑后,会释放同步状态,释放后会唤醒其后继节点。

4. 参考

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

0

评论区