多线程与高并发(三)—— 源码解析 AQS 原理

虚幻大学 xuhss 289℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

一、前言

AQS 是一个同步框架,关于同步在操作系统(一)—— 进程同步 中对进程同步做了些概念性的介绍,我们了解到进程(线程同理,本文基于 JVM 讲解,故下文只称线程)同步的工具有很多:Mutex、Semaphore、Monitor。但是Mutex 和 Semaphore 作为低级通信存在不少缺点,Monitor 机制解决了上面部分缺憾,但是仍然存在问题,AQS 的出现很好的解决了这些问题。

二、其他同步工具的缺点

Mutex

  • Mutex 只有两种状态,锁定和非锁定,无法表示临界区的可用资源数量(计数信号量可解决)。
  • 使用复杂,使用不当易造成死锁

Semaphore

  • Mutex 只允许一个线程访问临界资源, Semaphore 允许一定数量的线程访问共享资源。但是 Semaphore 中没有 Owner,无法知道当前获取锁的线程是谁。
  • 使用复杂,使用不当易造成死锁,P V 操作需要配对使用,分散在代码各处增大了编码难度。

Monitor

Monitor 解决了上述几个问题,但在 HotSpot 中底层是基于 Mutex 做的线程同步,在 1.6 之前且还没有进行优化,每次锁竞争都需要经历两次上下文切换严重影响性能。第二个问题是 HotSpot 实现的是精简的 Mesa 语义,不支持多个条件变量。

三、AQS 概述

什么是 AQS ?

AQS 即一个类,java.util.concurrent.AbstractQueuedSynchronized.Class,这个类作为一个构造同步器的框架。AQS 本身并不提供 API 给程序员用于直接的同步控制,它是一个抽象类,通过实现它的抽象方法来构建同步工具,如 ReentrantLock、CountDownLatch 等。也就是说它不是一个面向业务开发使用的工具,是构建同步器所复用的一套机制抽取出来形成的框架,这个框架我们自己也可以通过实现它的抽象方法来构建自己的同步器。

AQS 做了什么事?

其实通过观察同步器的各种实现都是相似的,我们会发现锁的实现通常需要以下三要素:

  • 状态管理
  • 加锁解锁操作
  • 线程等待

状态管理在信号量中是整型值,在 Mutex 中就是 Owner,在 Synchronized (Monitor) 中也是 Owner,这些机制都有对应的操作来加锁和解锁,通常加锁/解锁操作也对应着线程的阻塞(等待)/唤醒,这些线程没有获取到锁后需要等待,有的实现是自旋,有的实现是挂起。不论是 Synchronized 还是 AQS 都有等待队列供未获取到锁的线程进入,直到锁释放。
同理,AQS 所做的主要的三件事也就是上面三件,只是每一项的实现细节可能不同,支持的功能更广泛。

  • 状态的原子性管理
  • 加锁/解锁操作
  • 队列管理(线程阻塞入队/唤醒出队)

下文会先列出 AQS 的设计,也就是实现了哪些特性,接下来就会通过看源码和图示来了解这些设计的具体实现。

AQS 设计

  • 阻塞和非阻塞
  • 可选超时设置,在超时后放弃等待锁
  • 锁可中断
  • 独占和共享模式

独占模式即同一时刻只能有一个线程可以通过阻塞点,共享模式下可以同时有多个线程在执行。
以上都是 AQS 需要支持的功能,基于模板模式的设计,AQS 提供了以下方法供子类继承重新实现:

7979524970a9306900613288ebeacd7d - 多线程与高并发(三)—— 源码解析 AQS 原理

  • tryAcquire()/tryRelease() 为独占模式下的加锁解锁操作
  • tryAcquireShared()/tryReleaseShared() 为共享模式下的加锁解锁操作
  • isHeldExclusively() 表明锁是否为独占模式,即当前线程是否独占资源。

四、AQS 原理

原理概括

关于同步器的实现思路,首先需要有一个状态来标明锁是否可用,在 AQS 的实现中,其维护了一个变量 state,这个变量使用 volatile 关键字进行标识,保证其在线程之间的可见性。加锁解锁操作简化来看就是只需要把这个状态更改,且标明当前线程占有锁。在独占模式下,加锁操作必须互斥,也就是在同一时刻只能有一个线程加锁成功,AQS 使用 CAS 原子指令来保证 State 的互斥访问。在一个线程成功加锁(改变锁的状态 State 的值,该值具体如何改变取决于同步工具如何实现)之后,其他线程尝试 CAS 则会加锁失败,那么加锁失败的线程该如何呢?这些线程可以自旋等待或者阻塞,AQS 提供 CLH 队列将这些阻塞的线程管理起来,CLH 是一个先进先出的队列,加锁失败的线程会被进入队列阻塞。因此加锁和解锁操作并不仅仅是简单的修改锁状态,在这之后还需要维护队列。AQS 的主要原理也就是围绕着队列进行入,加锁解锁功能由继承 AQS 的同步工具实现,在调用加锁操作之后,AQS 来维护线程入队,并且将线程阻塞,在调用解锁操作后,AQS 将队列中的线程按规则出队并且唤醒。

Node 设计

static final class Node {

        static final Node SHARED = new Node();  // 共享模式下的节点

        static final Node EXCLUSIVE = null; // 独占模式下的节点

        static final int CANCELLED =  1; // 被取消了的状态

        static final int SIGNAL    = -1; // 处于park() 状态等待被unpark()唤醒的状态

        static final int CONDITION = -2; // 处于等待 condition 的状态

        static final int PROPAGATE = -3; // 共享模式下需要继续传播的状态

        volatile int waitStatus; // 当前节点的状态

        volatile Node prev; // 前驱节点指针

        volatile Node next; // 后继节点指针

        volatile Thread thread; // 抢占锁的线程

        Node nextWaiter; // 指向下一个也处于等待的节点
    }

上面提到 AQS 有一个阻塞队列,这个队列的具体实现是双向链表,队列中的节点是对线程进行封装后的 Node 类,这个类的每个字段含义如下:

  1. 节点状态,即节点的 waitStatus 值
  • CANCELLED 当前节点因为超时或者被打断而取消,处于这个状态的节点永远不会被阻塞
  • SIGNAL 当前节点的前驱节点被阻塞或者即将被阻塞,所以当前节点必须在释放或者取消的时候对其unpark()
  • CONDITION 当前节点处于 Condition 队列等待,Condition 队列不是 AQS 中的 CLH 队列,这个状态不是用于 CLH 队列的,只有当该节点从 Condition 转移到 CLH 中时,这个状态才会生效(详细内容见下一篇章 ReentrantLock)。
  • PROPAGATE 共享状态下对锁释放的时候需要释放所有获取锁的节点,因而需要这个状态表明还需要继续释放
  • 0 这个值也是waitStatus 的值,没有进行常量定义,其含义为不是以上值
  1. waitStatus 即当前线程现在的状态,值就是上面列出的这些
  2. nextWaiter 是 Condition 中指出下一个也处于等待队列的节点

五、源码分析

为了更好的讲解原理,下面直接模拟线程竞争锁的场景来进行分析,我们首先看两个线程在独占锁模式下加锁和解锁的源码,再看共享模式下的场景。

独占模式加锁

场景:线程 T1 和线程 T2 竞争锁

1. acquire(int arg)

acquire(int arg) 是独占模式下加锁的入口方法

public final void acquire(int arg) {
        // tryAcquire(arg) 是继承 AQS 实现在独占模式下的同步工具需要重写的方法。当这个方法返回true,就表示当前线程获得了锁。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

流程如下:

  1. 线程 T1 和线程 T2 同时要执行临界区,在执行临界区之前需要进行加锁操作,也就是同步工具实现的加锁方法,加锁方法最终会走到 AQS 的 acquire(int arg) 方法;
  2. 此时,两个线程同时首先都执行 tryAcquire(arg) 操作,该方法在重写的时候通过 CAS 设置状态 State 的值,CAS 保证只能有一个线程成功执行,即只有一个线程能加锁成功,假设线程 T1 加锁成功。
  3. 线程 T1 执行 tryAcquire(arg) 加锁成功,!tryAcquire(arg) 取反操作后 T1 线程不必执行后续逻辑,即加锁成功,开始执行临界区
  4. 此时线程 T2 执行 tryAcquire(arg) 必然失败 (由于 State 的值被修改过,执行 CAS 失败即加锁不成功)
  5. tryAcquire(arg) 返回false , 取反后为 true ,执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6. acquireQueued() 的结果方法分四种情况(具体情况见这个方法的解析),总之最后会返回一个标记表明当前线程是否需要被打断,如果为true,则调用 selfInterrupt() 再次设置打断标记
static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

2. addWaiter(Node mode)

private Node addWaiter(Node mode) {
        // 根据当前线程实例化一个独占式的 Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        /**
         * 1. 将尾部赋给一个临时变量,注意此时tail为空,因为tail此时还没有指向Node对象
         * 2. tail == null,因此执行 enq(node) 方法,这个node是创建的当前线程的node
         */
 Node pred = tail;
 if (pred != null) {
 node.prev = pred;
 // 这个CAS方法的含义是,判断当前AQS的尾部节点是pred,如果是则重新设置为当前node
 // 设定成功,则退出
 if (compareAndSetTail(pred, node)) {
 pred.next = node;
 return node;
 }
 }
 // 如果挂载队尾节点也存在竞争 则使用无限CAS自旋方式设置队尾
        enq(node);
        return node;
    }

对上述流程第4点拆解来看,T2 首先要执行addWaiter(Node.EXCLUSIVE)进行入队,我们看下执行流程:

  1. 根据当前线程实例化一个独占式的 Node
  2. 将尾部赋给一个临时变量,看官方注释,该指针是懒加载的,在执行enq()方法新入一个节点的时候 tail 才有值,所以此时为 NULL
    1421162813cb5e91b07678ee02f5b149 - 多线程与高并发(三)—— 源码解析 AQS 原理
  3. tail == null,因此执行 enq(node) 方法进行当前节点入队操作,入队成功后返回该节点。

3. enq(final Node node)

private Node enq(final Node node) {
        // 死循环判断
        for (;;) {
            /**
             * 第一次循环:
             *  tail赋值给临时变量t, 注意此时tail仍然是null, 进入if块
             * 调用compareAndSetHead(new Node()) 创建了一个新的节点 ,这个节点的结构如下:
             *         Node() {    // Used to establish initial head or SHARED marker
             *         }
             *   该节点中的线程为 NULL,也就是我们下文所称的哑节点
             *   此时队列里有了第一个Node
             */
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                /**
                 * 第二次循环:在第一次循环时head赋给了tail,此时tail 不为空,
                 * 进入else块,将线程T1入队,也就是维护链表关系
                 */
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq() 算是上述整个流程的一个分支,为后续讲解更清楚,有必要先详细讲解此方法,画出入队情形下的队列图,流程如下:

  1. 注意这是一个 for 循环,在第一次执行的时候,由于 tail 为 NULL,因此进入 if 块创建了一个空节点,空节点的意思是该节点中没有包含线程,如图所示。

e13f1c1d8514d4bafdbf21fe026d8e95 - 多线程与高并发(三)—— 源码解析 AQS 原理

  1. 第二次循环时,tail 即指向没有线程的那个Node,以下称哑节点,此时 tail 不为空,进入 else 逻辑。
  2. 首先更新 T2 Node 的 prev 指针指向哑节点,然后通过 CAS 设置 T2 Node 为尾节点,因为当前尾节点指向的就是哑节点 t,因此 compareAndSetTail(t, node) 可以成功执行,执行成功后,尾节点指针指向 T2 Node。
  3. t.next = node 将哑节点的 next 指针指向 T2。

下图是 T2 入队成功后队列图:
b8eb0ef81e87431764cb0148ee83d4b7 - 多线程与高并发(三)—— 源码解析 AQS 原理

4. acquireQueued(final Node node, int arg)

对上述流程第4点进行拆解,执行完毕 addWaiter() 返回当前节点 T2 Node,接着就进入acquireQueued 方法,我们来看该方法的流程:

final boolean acquireQueued(final Node node, int arg) {
        // 标记自己是否拿到了资源 true为没有获取到。
        boolean failed = true;
        try {
            // 标记等待过程中是否被中断过
            boolean interrupted = false;
            for (;;) {
                // 拿到前驱节点
                final Node p = node.predecessor();
                // 判断上一个节点是否是头部,即当前节点为队列中第二个节点,tryAcquire()尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 将自己设置为头结点
                    setHead(node);
                    // 断开原先的哑节点与当前节点的next连接
                    p.next = null; // help GC
                    // 标记已经获取到节点
                    failed = false;
                    // 返回线程中断标记
                    return interrupted;
                }
                /**
 * 判断在一次自旋加锁失败后是否需要睡眠
 * 自旋第一次时shouldParkAfterFailedAcquire(p, node)返回false, 不会进入if块
 * 自旋第二次时,shouldParkAfterFailedAcquire(p, node)返回true,
 * 此时进入parkAndCheckInterrupt()方法,此时线程阻塞在 parkAndCheckInterrupt()
 * 如果线程休息过程中被中断过(Thread.interrupted();), interrupted 返回true
 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

该方法有两个标记,一个是 failed 标记是否拿到了资源,注意默认为 true 的时候是没有拿到,另一个是中断标记 interrupted 默认为 false, 该标记的含义不是该线程是否已经被打断过,而是是否需要被打断。我们先看 for 循环中的逻辑再看 finally 块。

  1. 首先拿到当前节点的前驱节点,当前节点就是返回的节点 T2 Node,因此它的前驱节点就是哑节点。
  2. 判断上一个节点是否为头部,如果是通过 tryAcquire(arg) 再次尝试获取锁。CLH 是一个先进先出的队列,第一个节点是哑节点,所以无需竞争锁,也就是只有队列中的第二个节点才有再次竞争锁资源的资格。这里为什么又要调用一次 tryAcquire() 呢?因为这时候线程 T1 有可能已经释放了资源,这里也就是常说的自旋锁,但是只自旋了一次。
  3. 如果这次加锁成功,那么进入 if 块,首先看 setHead()的逻辑,在这里 T2 节点替代了原先的哑节点作为头节点,并且自己成了哑节点。然后断开与之前的哑节点的所有连接,原哑节点等待 GC 回收。
private void setHead(Node node) {
        head = node;
        // 当前节点的线程字段置为空,也就是这个节点替代了原先的哑节点变成了新的哑节点
        node.thread = null;
        // 断开与原哑节点的prev连接
        node.prev = null;
    }

a6ff72ab825f362e28406112be2a5492 - 多线程与高并发(三)—— 源码解析 AQS 原理

  1. failed 标记设置为 false,表示成功获取到资源,然后返回 interrupted 中断标记,此时还是为 false ,也就是无需打断,方法返回之后就可以开始执行临界区了。关于 interrupted 标记待 parkAndCheckInterrupt 方法的时候会细讲,在这次如果直接加锁成功的情况下也就是还没有走下面的 park 阻塞逻辑的时候,该打断标记就是 false。
  2. 如果此次加锁失败,则向下执行,首先执行 shouldParkAfterFailedAcquire(p, node) 方法

5. shouldParkAfterFailedAcquire(Node pred, Node node)

/**
 * 检查当前节点并更新状态,返回是否需要被阻塞
 * Checks and updates status for a node that failed to acquire.
 * Returns true if thread should block. This is the main signal
 * control in all acquire loops. Requires that pred == node.prev.
 *
 * @param pred node's predecessor holding status 前置节点
 * @param node the node 当前节点
 * @return {@code true} if thread should block 是否需要被阻塞
 */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前置节点的状态,ws默认值为0
        int ws = pred.waitStatus;
        // SIGNAL默认值为1
        if (ws == Node.SIGNAL)
            /*
 * This node has already set status asking a release
 * to signal it, so it can safely park.
 */
            return true;
        // 前置节点处于取消状态
        if (ws > 0) {
            /*
 * Predecessor was cancelled. Skip over predecessors and
 * indicate retry.
 * 如果前置节点不是正常的等待状态那么就继续往前找直到找到一个正在等待装填的节点。将其后置节点断开接上当前节点。GC会回收一堆相互引用又没有外部引用的节点。
 */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
 * waitStatus must be 0 or PROPAGATE. Indicate that we
 * need a signal, but don't park yet. Caller will need to
 * retry to make sure it cannot acquire before parking.
 */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

接上述流程,T2 线程获取资源失败,因而走此方法逻辑,此时传进该方法的参数 p 是 T2 Node 的前置节点,也就是哑节点(注意因没获取到资源没有进入if块,哑节点还是最先创建的那个哑节点)。这个方法就是在线程没有获取资源的情况下不断自旋直到进入阻塞状态或者是加锁成功,shouldParkAfterFailedAcquire 就是返回一个布尔值判断当前线程是否需要进行阻塞。
340f625ba74629f0a5abb99f490e8473 - 多线程与高并发(三)—— 源码解析 AQS 原理
流程如下:

  1. 拿到哑节点的 waitStatus (以下简称ws)状态,这个状态初始值是 0。
    5b520f75c8468b38eab39c3556d39a9d - 多线程与高并发(三)—— 源码解析 AQS 原理
  2. ws 的值为 0, 所以进入 else 逻辑,将 ws 设置为了 -1 (SIGNAL),返回flase。
  3. 回到 acquireQueued() 方法 因为返回的是 false,&& 的短路性质,没有再执行后一个逻辑,进入下一次循环
    89224804e374e8e56bf8d5e2cfc935d8 - 多线程与高并发(三)—— 源码解析 AQS 原理
  4. 假设 T2 此时仍然没有加锁成功,继续走到这个 shouldParkAfterFailedAcquire() 这个方法中,由于上一次哑节点的状态被修改成了 SIGNAL,这次进入第一个 if 块,返回了 true。
  5. 回到 acquireQueued() 方法,向下执行 parkAndCheckInterrupt(),我们接着看该方法的逻辑。

6.parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        // 线程阻塞在这,被打断后才会往下执行, 此时打断标记为 true
        LockSupport.park(this);
        // 返回打断标记并重置为false
        return Thread.interrupted();
    }

这个方法比较简单,就是在上一个方法判断要阻塞之后调用这个方法,通过执行 park() 方法将线程阻塞在此处,直到有另一个线程将其唤醒。唤醒的方式有两种,一是调用 unpark() 方法,二是使用 interrupt() 方法置中断标记为 true。T2 Node 阻塞在此处被唤醒后就继续往下走执行 Thread.interrupted(), 该方法会重置中断标记并且给出是否被中断,因此此方法最终会返回一个 bool 值,表明该线程是否需要被打断。
T2 Node 执行到此处则一直阻塞在这了,为了讲清楚这些细节,我们按实际场景讲述,这个线程就阻塞在这了, T2 Node 被唤醒必然是另一个线程对其执行了 unpark() 或者 interrupt() 方法,我们先来看调用 unpark() 的情况,也就是线程 T1 解锁的流程。

独占模式解锁

1. release(int arg)

书接上回,T2 阻塞住了,此时 T1 执行完毕临界区,开始进行解锁,解锁操作首先是恢复锁状态 State 为可用,其中是否重入等细节由继承AQS 的同步工具实现(待讲 ReentrantLock 再讲此处细节),此处略过不表,我们重点关注 AQS 的锁释放原理,解锁过程(独占模式下)最终会调用到 AQS 的 release(int arg) 方法,流程如下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 头结点存在 且头结点状态不为0
            // 如果头结点不为初始化状态则唤醒队列下一个等待的线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  1. 首先调用 tryRelease() 方法,和加锁同理,这个方法也由继承 AQS 的同步工具释放,这个函数返回一个布尔值,true 表明释放锁成功,false 解锁失败,T1 线程释放资源后,这个方法必然返回true,于是进入 if 块。
  2. 判断头节点是否存在且头节点状态是否不为0,我把上面 T2 入队的图拿了下来,可以看到这个时候头节点是哑节点,其状态 ws 在 T2 入队的时候被修改了为 -1,此时可以进入if块,执行unparkSuccessor() 方法,入参是哑节点,我们接下来看这个方法的源码。

e29da0a93f3fb7be67ba84004b2639f3 - 多线程与高并发(三)—— 源码解析 AQS 原理

2. unparkSuccessor(Node node)

    private void unparkSuccessor(Node node) {
        /*
 * If status is negative (i.e., possibly needing signal) try
 * to clear in anticipation of signalling. It is OK if this
 * fails or if status is changed by waiting thread.
 * 获取当前节点状态
 */
        int ws = node.waitStatus;
        // 该节点状态是有效的 就将其设置为初始化状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
 * Thread to unpark is held in successor, which is normally
 * just the next node. But if cancelled or apparently null,
 * traverse backwards from tail to find the actual
 * non-cancelled successor.
 * 从尾节点往前找, 找到一个waitStatus 小于0 的Node
 */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

此时流程如下:

  1. 取出哑节点状态,此时为-1,将其更改为0;
  2. 接着取出下一个节点,在这里哑节点的下一个节点即 T2 Node,T2 不为空,执行 LockSupport.unpark(s.thread) 唤醒 T2 线程。
    注意这里有一个 s== null 的 if 分支,这是因为 next 指针是不可靠的(不可靠的原因以及这一段为什么这么设计可以看这篇【杀手小顾】的回答,我就不班门弄斧了:https://www.zhihu.com/question/50724462/answer/213577684) ,很有可能指向为 NULL 或者是错误的节点,因此需要通过 prev 指针来从尾部开始遍历直到找到最后个可被唤醒的线程,注意这里说的最后一个的意思即是队头开始第一个需要被唤醒的线程(没有调用 break 打破循环),因此并没有破坏队列先进先出的规则。

T2 线程被 unpark() 唤醒

896689a3e29cd11371d54369a4d7c1a2 - 多线程与高并发(三)—— 源码解析 AQS 原理

上面的解锁流程的最后,T2 线程被唤醒,继续往下执行, 注意这里我看其他博客说的有点问题,在被 unpark 唤醒的情况下是不会修改中断标记的,因此这里返回的是 false,所以回到 acquireQueued 方法中并不会执行下面这行代码,关于 unpark() 和 interrupt() 我单独开篇博客讲下

f8c2c000508e6bff1b1a419774bf3dc0 - 多线程与高并发(三)—— 源码解析 AQS 原理

T2 线程被唤醒后就开始有了竞争锁的资格,这时候 for 循环又开始执行,即再次竞争锁,假如这次加锁成功(加锁失败就又继续阻塞,咱就没必要继续讲了,为了闭环我们假设这次加锁成功),那么返回中断标记 interrupted,这个值为 false。
e7b0e28004e8c4c79f44576454f1947d - 多线程与高并发(三)—— 源码解析 AQS 原理
此时也就完成了锁的更替 由 T1 变成了 T2,整个环节就闭环了,在这上面的讲解过程中没有引入更多的线程是为了避免讨论复杂化,在 T3、T4 甚至更多线程来临时同样是遵循的上述代码流程,只是因为多线程并发,这其中哪个线程能获取到锁、链表的维护是否正确,唤醒的时候唤醒的是哪个线程这些事并不可控,因而也没有必要一个个场景来复现,只要弄清楚原理想必足够了。

T2 线程被 interrupt() 唤醒

5e1cb0fc8fa1015314c7aea17aa9a66d - 多线程与高并发(三)—— 源码解析 AQS 原理

  1. 这个流程和上述 unpark() 流程是并列的,承接的是 T2 被阻塞住的流程,T2 阻塞在此处,若 T1 在执行临界区的时候调用了 T2.interrupt() 方法,则也会对 park 解锁,T2 线程就被唤醒了,且中断标记在 interrupt() 执行的时候被置为 true,此方法返回true。
    5e1cb0fc8fa1015314c7aea17aa9a66d - 多线程与高并发(三)—— 源码解析 AQS 原理
  2. 回到 acquireQueued 方法中,interrupted 标记被修改,但是没有立即返回,直到 T2 线程获取到了资源后返回中断标记 true。
    4b5bdfe435164821ca6b2d704d95f5c1 - 多线程与高并发(三)—— 源码解析 AQS 原理
  3. 最终执行到 selfInterrupt() 方法重置一次中断标记,关于中断详情后续再单独开个篇章或者结合 ReentrantLock 中断原理再讲吧。
    014a791c12643fed41c679404c57d694 - 多线程与高并发(三)—— 源码解析 AQS 原理

一些 AQS 设计的疑问

  1. 在被中断打断后为什么还要调用 selfInterrupt() 再中断一次?
  2. 为什么要有dummy 节点(哑节点)?
  3. 为什么线程的状态要保存在上一个节点?
    2和3在上面提到的知乎【杀手小顾】的回答有些解释:https://www.zhihu.com/question/50724462/answer/213577684
  4. 在加锁成功后的出队过程中,为什么要将出对的线程 Node 去替代哑节点成为新的哑节点?直接断开指针,哑节点指向下一个节点会如何呢?

780b82bec60d05f322f002e8c5217780 - 多线程与高并发(三)—— 源码解析 AQS 原理

结语

研读源码过程中发现所需时间耗费过长,在这个以敏捷为王的时代,偶尔也疑虑如此费劲周章地研习源码能有何收获,成本与收获似不成正比。故而 AQS 的源码暂且看到这,想必知晓原理已足够,后续若有闲思再补充共享式加锁和解锁流程细节。

转载请注明:xuhss » 多线程与高并发(三)—— 源码解析 AQS 原理

喜欢 (0)

您必须 登录 才能发表评论!