2026/3/29 16:37:55
网站建设
项目流程
网站图片特效源码,企业信息化管理系统有哪些,做个网站多少费用,有南昌网站优化公司AQS是AbstractQueuedSynchronizer的简称#xff0c;即抽象队列同步器#xff0c;从字面上可以这样理解:抽象#xff1a;抽象类#xff0c;只实现一些主要逻辑#xff0c;有些方法由子类实现#xff1b;队列#xff1a;使用先进先出#xff08;FIFO#xff09;的队列存…AQS是AbstractQueuedSynchronizer的简称即抽象队列同步器从字面上可以这样理解:抽象抽象类只实现一些主要逻辑有些方法由子类实现队列使用先进先出FIFO的队列存储数据同步实现了同步的功能。AQS 是一个用来构建锁和同步器的框架使用 AQS 能简单且高效地构造出应用广泛的同步器比如ReentrantLockSemaphoreReentrantReadWriteLockSynchronousQueueFutureTask 等等都是基于 AQS 的。AQS 的数据结构AQS 内部使用了一个 volatile 的变量 state 来作为资源的标识。/** * The synchronization state. */ private volatile int state;同时定义了几个获取和改变 state 的 protected 方法getState() setState() compareAndSetState()这三种操作均是原子操作其中 compareAndSetState 的实现依赖于 Unsafe 的compareAndSwapInt()方法。AQS 内部使用了一个先进先出FIFO的双端队列并使用了两个引用 head 和 tail 用于标识队列的头部和尾部。其数据结构如下图所示但它并不直接储存线程而是储存拥有线程的 Node 节点。AQS 的 Node 节点资源有两种共享模式或者说两种同步方式独占模式Exclusive资源是独占的一次只能有一个线程获取。如 ReentrantLock。共享模式Share同时可以被多个线程获取具体的资源个数可以通过参数指定。如 Semaphore/CountDownLatch。一般情况下子类只需要根据需求实现其中一种模式就可以当然也有同时实现两种模式的同步类如 ReadWriteLock。AQS 中关于这两种资源共享模式的定义源码均在内部类 Node 中。我们来看看 Node 的结构static final class Node { // 标记一个结点对应的线程在共享模式下等待 static final Node SHARED new Node(); // 标记一个结点对应的线程在独占模式下等待 static final Node EXCLUSIVE null; // waitStatus的值表示该结点对应的线程已被取消 static final int CANCELLED 1; // waitStatus的值表示后继结点对应的线程需要被唤醒 static final int SIGNAL -1; // waitStatus的值表示该结点对应的线程在等待某一条件 static final int CONDITION -2; /*waitStatus的值表示有资源可用新head结点需要继续唤醒后继结点共享模式下多线程并发释放资源而head唤醒其后继结点后需要把多出来的资源留给后面的结点设置新的head结点时会继续唤醒其后继结点*/ static final int PROPAGATE -3; // 等待状态取值范围-3-2-101 volatile int waitStatus; volatile Node prev; // 前驱结点 volatile Node next; // 后继结点 volatile Thread thread; // 结点对应的线程 Node nextWaiter; // 等待队列里下一个等待条件的结点 // 判断共享模式的方法 final boolean isShared() { return nextWaiter SHARED; } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter mode; this.thread thread; } // 其它方法忽略可以参考具体的源码 } // AQS里面的addWaiter私有方法 private Node addWaiter(Node mode) { // 使用了Node的这个构造函数 Node node new Node(Thread.currentThread(), mode); }这里面的 waitStatus 是用来标记当前节点的状态的它有以下几种状态CANCELLED表示当前节点对应的线程已被取消。当等待超时或被中断会触发进入为此状态进入该状态后节点状态不再变化SIGNAL后面节点等待当前节点唤醒CONDITIONCondition中使用当前线程阻塞在Condition如果其他线程调用了Condition的signal方法这个节点将从等待队列转移到同步队列队尾等待获取同步锁PROPAGATE共享模式前置节点唤醒后面节点后唤醒操作无条件传播下去0中间状态当前节点后面的节点已经唤醒但是当前节点线程还没有执行完成。通过 Node 我们可以实现两种队列1一是通过 prev 和 next 实现 CLHCraig, Landin, and Hagersten队列线程同步队列、双向队列。在 CLH 锁中每个等待的线程都会有一个关联的 Node每个 Node 有一个 prev 和 next 指针。当一个线程尝试获取锁并失败时它会将自己添加到队列的尾部并自旋等待前一个节点的线程释放锁。类似下面这样。public class CLHLock { private volatile Node tail; private ThreadLocalNode myNode ThreadLocal.withInitial(Node::new); private ThreadLocalNode myPred new ThreadLocal(); public void lock() { Node node myNode.get(); node.locked true; // 把自己放到队尾并取出前面的节点 Node pred tail; myPred.set(pred); while (pred.locked) { // 自旋等待 } } public void unlock() { Node node myNode.get(); node.locked false; myNode.set(myPred.get()); } private static class Node { private volatile boolean locked; } }2二是通过 nextWaiter 实现 Condition上的等待线程队列单向队列这个 Condition 主要用在 ReentrantLock 类中。AQS 的源码解析AQS 的设计是基于模板方法模式的它有一些方法必须要子类去实现的它们主要有isHeldExclusively()该线程是否正在独占资源。只有用到 condition 才需要去实现它。tryAcquire(int)独占方式。尝试获取资源成功则返回 true失败则返回 false。tryRelease(int)独占方式。尝试释放资源成功则返回 true失败则返回 false。tryAcquireShared(int)共享方式。尝试获取资源。负数表示失败0 表示成功但没有剩余可用资源正数表示成功且有剩余资源。tryReleaseShared(int)共享方式。尝试释放资源如果释放后允许唤醒后续等待结点返回 true否则返回 false。这些方法虽然都是protected的但是它们并没有在 AQS 具体实现而是直接抛出异常protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }获取资源获取资源的入口是acquire(int arg)方法。arg 是要获取的资源个数在独占模式下始终为 1。我们先来看看这个方法的逻辑public final void acquire(int arg) { // tryAcquire 再次尝试获取锁资源如果尝试成功返回true尝试失败返回false if (!tryAcquire(arg) // 走到这代表获取锁资源失败需要将当前线程封装成一个Node追加到AQS的队列中 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 线程中断 selfInterrupt(); }首先调用 tryAcquire 尝试去获取资源。前面提到了这个方法是在子类中具体实现的可以直接进入 ReentrantLock 中 查看。如果获取资源失败就通过addWaiter(Node.EXCLUSIVE)方法把这个线程插入到等待队列中。其中传入的参数代表要插入的 Node 是独占式的。这个方法的具体实现private Node addWaiter(Node mode) { //创建 Node 类并且设置 thread 为当前线程设置为排它锁 Node node new Node(Thread.currentThread(), mode); // 获取 AQS 中队列的尾部节点 Node pred tail; // 如果 tail null说明是空队列 // 不为 null说明现在队列中有数据 if (pred ! null) { // 将当前节点的 prev 指向刚才的尾部节点那么当前节点应该设置为尾部节点 node.prev pred; // CAS 将 tail 节点设置为当前节点 if (compareAndSetTail(pred, node)) { // 将之前尾节点的 next 设置为当前节点 pred.next node; // 返回当前节点 return node; } } enq(node); return node; } // 自旋CAS插入等待队列 private Node enq(final Node node) { for (;;) { Node t tail; if (t null) { // Must initialize if (compareAndSetHead(new Node())) tail head; } else { node.prev t; if (compareAndSetTail(t, node)) { t.next node; return t; } } } }上面的两个方法比较好理解就是在队列的尾部插入新的 Node 节点但是需要注意的是由于 AQS 中会存在多个线程同时争夺资源的情况因此肯定会出现多个线程同时插入节点的操作在这里是通过 CAS 自旋的方式保证了操作的线程安全性。现在通过 addWaiter 方法已经把一个 Node 放到等待队列尾部了。而处于等待队列的结点是从头结点一个一个去获取资源的。具体的实现我们来看看 acquireQueued 方法final boolean acquireQueued(final Node node, int arg) { boolean failed true; try { // interrupted用于记录线程是否被中断过 boolean interrupted false; for (;;) { // 自旋操作 // 获取当前节点的前驱节点 final Node p node.predecessor(); // 如果前驱节点是head节点并且尝试获取同步状态成功 if (p head tryAcquire(arg)) { // 设置当前节点为head节点 setHead(node); // 前驱节点的next引用设为null帮助垃圾回收器回收该节点 p.next null; // 获取同步状态成功将failed设为false failed false; // 返回线程是否被中断过 return interrupted; } // 如果应该让当前线程阻塞并且线程在阻塞时被中断则将interrupted设为true if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt()) interrupted true; } } finally { // 如果获取同步状态失败取消尝试获取同步状态 if (failed) cancelAcquire(node); } }这里 parkAndCheckInterrupt 方法内部使用到了LockSupport.park(this)顺便简单介绍一下 park 方法。LockSupport 类是 Java 6 引入的一个类提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的方法归结到 Unsafe 里只有两个park(boolean isAbsolute, long time)阻塞当前线程unpark(Thread jthread)使给定的线程停止阻塞所以结点进入等待队列后是调用 park 使它进入阻塞状态的。只有头结点的线程是处于活跃状态的。当然获取资源的方法除了 acquire 外还有以下三个acquireInterruptibly申请可中断的资源独占模式acquireShared申请共享模式的资源acquireSharedInterruptibly申请可中断的资源共享模式可中断的意思是在线程中断时可能会抛出InterruptedException。释放资源释放资源相比于获取资源来说会简单许多。在 AQS 中只有一小段实现。源码public final boolean release(int arg) { if (tryRelease(arg)) { Node h head; if (h ! null h.waitStatus ! 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { // 如果状态是负数尝试把它设置为0 int ws node.waitStatus; if (ws 0) compareAndSetWaitStatus(node, ws, 0); // 得到头结点的后继结点head.next Node s node.next; // 如果这个后继结点为空或者状态大于0 // 通过前面的定义我们知道大于0只有一种可能就是这个结点已被取消只有 Node.CANCELLED(1) 这一种状态大于0 if (s null || s.waitStatus 0) { s null; // 从尾部开始倒着寻找第一个还未取消的节点真正的后继者 for (Node t tail; t ! null t ! node; t t.prev) if (t.waitStatus 0) s t; } // 如果后继结点不为空 if (s ! null) LockSupport.unpark(s.thread); }在java.util.concurrent.locks.ReentrantLock的实现中tryRelease(arg)会减少持有锁的数量如果持有锁的数量变为0释放锁并返回true。如果tryRelease(arg)成功释放了锁那么接下来会检查队列的头结点。如果头结点存在并且waitStatus不为0这意味着有线程在等待那么会调用unparkSuccessor(Node h)方法来唤醒等待的线程。