AQS源码阅读

虚幻大学 xuhss 349℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

简介

AQS 全程为 AbstractQueuedSynchronizer , 在 java.util.concurrent.locks包下的一个抽象类。

类的具体作用以及设计在开始类描述信息里面就有很好的表达

Provides a framework for implementing blocking locks and related
synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues.  This class is designed to
be a useful basis for most kinds of synchronizers that rely on a
single atomic {@code int} value to represent state. Subclasses
must define the protected methods that change this state, and which
define what that state means in terms of this object being acquired
or released.  Given these, the other methods in this class carry
out all queuing and blocking mechanics. Subclasses can maintain
other state fields, but only the atomically updated {@code int}
value manipulated using methods {@link #getState}, {@link
#setState} and {@link #compareAndSetState} is tracked with respect
to synchronization. 
具体翻译为:
提供了实现阻塞锁和相关的框架,依赖于的同步器(信号量、事件等)。
先进先出 (FIFO) 等待队列。 这个类被设计为对于大多数依赖于单个原子 {@code int} 值来表示状态。 
子类必须定义更改此状态的受保护方法,以及根据正在获取的对象定义该状态的含义或释放。 鉴于这些,此类中的其他方法带有排除所有排队和阻塞机制。 子类可以维护其他状态字段,但只有原子更新的 {@code int}使用方法 {@link #getState}、{@link 操作的值#setState} 和 {@link #compareAndSetState} 被地跟踪、同步。

简单描述为:通过原子性的int值来标记同步状态,实现阻塞锁机制,基于FIFO队列来实现排队机制

AQS实现了两种模式,独占模式和共享模式,实现共享锁和排他锁

以独占锁为例,FIFO队列头节点获取到锁之后,其他节点会进入等待状态,释放锁之后,头节点的下一个节点会尝试获取锁,但是不一定成功(公平、非公平)。

LockSupport工具类

LockSupport 官方是这样描述的 Basic thread blocking primitives for creating locks and other , 翻译为 “基本线程阻塞原语,用于创建锁和其他”,可以理解为使用计算机原语来创建锁,底层实现为unsafe类,主要方法为pack(线程阻塞)与unpack(唤醒线程),源码如下:

    // LockSupport设置阻塞分为两组,
    // 一组是不设置blocker的方法,另一组是设置blocker方法
    // JDK推荐为Thread设置blocker方便调试
    public static void park(Object blocker) {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // blocker位线程内存当前偏移量,设置线程内存偏移量,
        setBlocker(t, blocker);
        // Unsafe包下的方法可以直接操作内存,设置线程阻塞
        U.park(false, 0L);
        // 线程唤醒后 blockers设置为空
        setBlocker(t, null);
    }
    public static void park() {
        // 线程阻塞,单纯唤醒,不会记录偏移量
        U.park(false, 0L);
    }
    // 与park基本相同,添加阻塞事件,自动唤醒
    public static void parkNanos(Object blocker, long nanos);
    public static void parkUntil(Object blocker, long deadline);  
    // unpark方法为unsafe.unpark方法,为C++实现,基于内存进行操作
    public static void unpark(Thread thread) {
        if (thread != null)
            U.unpark(thread);
    }

AQS内部结构

AQS类图

23b304a416faabc0db251187f6a6ca66 - AQS源码阅读

AQS的实现类为NofairSync(非公平)、FairSync(公平)两种。

内部类Node

    static final class Node {
      // 共享
      static final Node SHARED = new Node();
      // 独占
      static final Node EXCLUSIVE = null;
      // 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
      static final int CANCELLED =  1;
      // 后继节点的线程状态处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点线程继续运行
      static final int SIGNAL    = -1;
      // 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
      static final int CONDITION = -2;
      // 表示下一次共享式同步状态获取将会无条件地传播下去
      static final int PROPAGATE = -3;
      // 等待状态
      volatile int waitStatus;
      // 前驱结点
      volatile Node prev;
      // 后继节点
      volatile Node next;
      // Node封装当前线程
      volatile Thread thread;
      // 指向ConditionObject的下一个节点
      Node nextWaiter; 
    }

节点状态

  1. CANCELLED (1):当前线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止
  2. SIGNAL (-1):当前线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的
  3. CONDITION (-2):当前线程在condition队列中
  4. PROPAGATE (-3):用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的

主要属性

/**
 * 等待队列的头节点, 赖加载
 * 除了初始化之外, 只能通过 setHead 方法来改变其值
 * 如果 head 不为 null, waitStatus 值就一定不会是 CANCELLED
 */
private transient volatile Node head;

/**
 * 等待队列的尾结点, 懒加载
 * 只能通过 enq 方法添加新节点时才会去改变尾结点
 */
private transient volatile Node tail;

/**
 * 同步器的状态
 * 以 ReentrantLock 为例, 0 表示可以获取到锁, 其他的正整数表示无法获取到锁
 */
private volatile int state;

AQS属性结构设计如下

537d8eae55c2587861b36b521a2eb55f - AQS源码阅读

具体方法

开放实现接口API,用于场景自定义:

方法 作用
boolean tryAcquire(int arg) 试获取独占锁
boolean tryRelease(int arg) 试释放独占锁
int tryAcquireShared(int arg) 试获取共享锁
boolean tryReleaseShared(int arg) 试释放共享锁
boolean isHeldExclusively() 当前线程是否获得了独占锁

AQS本身将同步状态的管理用模板方法模式都封装好了,以下列举了AQS中的一些模板方法

方法 描述
void acquire(int arg) 获取独占锁。会调用tryAcquire方法,如果未获取成功,则会进入同步队列等待
void acquireInterruptibly(int arg) 响应中断版本的acquire
boolean tryAcquireNanos(int arg,long nanos) 响应中断+带超时版本的acquire
void acquireShared(int arg) 获取共享锁。会调用tryAcquireShared方法
void acquireSharedInterruptibly(int arg) 响应中断版本的acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos) 响应中断+带超时版本的acquireShared
boolean release(int arg) 释放独占锁
boolean releaseShared(int arg) 释放共享锁
Collection getQueuedThreads() 获取同步队列上的线程集合

源码层面上会对acquire、release、acquireShared 、releaseShared 进行详解

独占锁

// 获取独占锁 
public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 试获取独占锁,获取同步状态 
            // addWaiter,将线程封装成Node节点,添加到同步队列的尾部,acquireQueued,自旋等待,如果获取状态失败,挂起线程
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
            selfInterrupt(); // 线程中断
}

private Node addWaiter(Node mode) {
    //创建Node节点
    Node node = new Node(mode);

    for (;;) {
        // 当前队尾进行标记
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            // CAS操作放到队尾
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}
// 将该线程加入等待队列的尾部,并标记为独占模式
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        // 自旋,不断尝试当前线程获取锁,知道成功或者线程被打断
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 成功之后将节点设置为新的头部节点
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            // 获取锁失败
            // 判断线程是否需要中断, 判断标准是前节点状态
            if (shouldParkAfterFailedAcquire(p, node))
                //挂起当前线程
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}
// 如果线程获取同步状态失败就要检查它的节点status,要保证prev = node.prev
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点状态
    int ws = pred.waitStatus;
    // singal 表示前驱节点释放后会唤起当前线程,会继续等待,那么线程可以中断等待
    if (ws == Node.SIGNAL)
        return true;
    // 前驱节点不是Head节点,并且状态为CANCELLED,那么就需要剔除前置节点,向前遍历,直到找到合适的前置
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 这种情况表示前驱节点的 ws = 0 或者 ws = PROPAGATE,那么设置前驱节点为singal,之后重新循环获取锁
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

acquire方法为获取独占锁

  1. tryAcquire() 尝试直接去获取资源,如果成功则直接返回true,AQS中空方法,由实现类来实现,具体分析会在ReentrantLock中进行详细代码分析。
  2. addWaiter() 将该线程加入等待队列的尾部,并标记为独占模式
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上
// 独占锁释放节点
public final boolean release(int arg) {
    // tryRelease由实现类具体实现
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒下一个节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release方法通过tryRelease()实现了扩展性,来进行锁的重入设计,unparkSuccessor则自身实现,完成了下一个Node节点的唤醒

共享锁

public final void acquireShared(int arg) {
    // tryAcquireShared为抽象方法,由子类实现
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}   

private void doAcquireShared(int arg) {
    // 不断尝试从队列中获取共享锁,当成功获取到锁或者线程被打断时会成功退出循环,竞争锁失败的线程会被 park 直到被唤醒,唤醒之后会再次进入循环尝试去获取锁,不断的重复整个过程
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            // 上面由详细解读
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

// 释放共享锁
public final boolean releaseShared(int arg) {
    // 子类实现
    if (tryReleaseShared(arg)) {

        doReleaseShared();
        return true;
    }
    return false;
}
//当前节点调用
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果头节点状态为SIGNAL, 表示可以去释放锁
            if (ws == Node.SIGNAL) {
                // 通过 cas 将 waitStatus 设为 0
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 线程唤醒
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

关于独占锁和共享锁的异同

相同

1)获取锁前都会判断是否有权限,只有满足条件才可能获取到锁

2)未获取到锁的线程会创建新节点放入队列尾部

不同

1)独占锁只会释放头部后节点的线程,而共享锁会依次释放所有线程

2)独占锁存在非公平锁的情况,新的线程可能抢占队列中线程的锁,共享锁则不存在这种情况

扩展

AQS使用了模板方法的设计模式,以固定的方法进行调用,子类对其实现,方便扩展,后续会对具体实现类继续读源码。

转载请注明:xuhss » AQS源码阅读

喜欢 (0)

您必须 登录 才能发表评论!