队列同步器之AbstractQueuedSynchronizer
AbstractQueuedSynchronizer是一款抽象的队列同步器,简称AQS。在Java并发包下许多工具类(如:ReentrantLock,CountDownLatch,ThreadPoolExecutor,Semaphore,ReentrantReadWriteLock)都间接使用了该同步器来实现各自的功能。可以说,了解了这个同步器,就了解了并发包下的这些工具类的核心思想。本文通过一些展示AbstractQueuedSynchronizer的一些核心方法,以此来揭示它的设计思想。我相信了解了队列同步器,你便可以很轻松了解上述的一些工具类实现原理,并且可以基于这个同步器来实现定制自己的业务需求。
核心变量
首先我们看下它的一些核心的变量定义:
// 队列头 private transient volatile Node head; // 队列尾 private transient volatile Node tail; // 同步状态 private volatile int state; // Node节点的定义 static final class Node { // 共享模式。 static final Node SHARED = new Node(); // 独占模式。就是一个NULL节点 static final Node EXCLUSIVE = null; // 各种等待状态定义,暂时了解一下,后面再具体分析 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 当前Node的等待状态,上面定义的某一个值之一或者是0,默认初始值即为0。 volatile int waitStatus; // 前序节点以及后序节点,熟悉双向链表应该就不难理解了。 volatile Node prev; volatile Node next; // Node代表的线程,这个是Node包装的真实内容。 volatile Thread thread; // 下一个等待者,后面再详细分析 Node nextWaiter; }
既然是同步队列,因此有队列头,队列尾的值,很好理解。值得重点关注的是这里的state,它是个非常关键的变量,当前同步器的状态,以及各个线程之间的竞争关系都是通过对该值CAS式的修改来实现的。再看下对这些值的修改:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } // CAS设置头节点 private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } // CAS设置尾节点 private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } // CAS设置下一个节点 private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } // CAS设置状态值 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
首先在static静态代码块里面,通过Unsafe类拿到这些关键变量的在内存中的位移地址。Unsafe顾名思义是不安全的,它主要用来操作底层的内存资源等。这里不展开说Unsafe类,暂时只需要做个简单理解,有兴趣同学可以网上查阅相关资料。总之,拿到了内存的位移地址,就可以知道这些变量在内存中的地址,为后续CAS操作这些变量提供了基础。
然后就是campareAndSetXXX的方法定义了,封装了Unsafe的CAS方法,来原子性的修改这些关键变量。
核心方法
有了对上述核心变量的知识,我们看下它的一些核心方法。
Acquire获取
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 节点以独占式状态,加入队列。 selfInterrupt(); } // 尝试获取 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
核心方法acquire,单词简单翻译过来就是获取、取得。结合同步器的语境,我把它理解为获取通行证或者取得凭证。方法的int参数可以理解为凭证的个数。
方法进来会调用尝试获取(tryAcquire),这个方法并没有任何实现,它交由子类去实现。该方法返回布尔值,表示尝试获取成功还是失败。如果尝试获取成功,则acquire直接返回;获取失败返回false,则进入后续的acquireQueued方法。
private Node addWaiter(Node mode) { // 初始化,包装了当前的线程。 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { // 如果尾节点不为空,则尝试快速添加一次节点。失败了,再调用后面的enq入队列方法。 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // Node加入到队列 enq(node); return node; } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
tryAcquire方法失败了,则将节点加入到队列中。其中,addWaiter方法就是入队列操作,如果tail节点不为空,说明队列中已经有了数据,则快速尝试CAS加一次队列,将该节点加入到队尾,原有的tail的next指向当前节点。如果竞争不激烈情况下,这里始终可以成功,因此提高了性能;竞争激烈的情况下,这里还是回退到调用enq方法,以死循环方式,不停尝试向队尾添加node对象。
添加队列成功以后,再调用acquireQueued方法,字面意思是,让已经排队的节点获取凭证。具体看下它的实现:
1、判断当前Node的前序节点是否是head节点。由于head节点一般是个伪节点(DummyHead),它并不实际存储值,因此如果判断成立,则当前节点就是第一个实际存储值的节点,然后再次尝试获取(tryAcquire)下,获取成功则直接返回,然后头节点再次配置成伪节点。
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
2、之后的判断便是shouldParkAfterFailedAcquire,字面理解就是它的实际含义:获取失败之后,线程是否需要挂起呢?
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
代码中看出,这里就需要通过前序节点的状态(waitStatus)来判断。上述的Node状态终于在这里露出了点面目。如果前序节点是在SIGNAL状态,则返回true,否则其他状态返回false,即不需要挂起。为了方便理解,我们想象一种场景:最开始线程A进入时,acquire成功,由于这个线程执行任务比较耗时,导致后续的线程进入时,全都需要进入队列排队;因此当第一个排队线程进入时,这里的前序节点其实就是head节点,头节点的waitStatus初始是0,根据判断流程来到:compareAndSetWaitStatus(pred, ws, Node.SIGNAL),将头节点等待状态设置为SIGNAL,最后返回false。然而由于我们的A线程还在执行任务,acquireQueued来到下一次循环,很显然这次循环下的shouldParkAfterFailedAcquire返回了true,表示需要park。由此可见,它是通过修改前序节点的waitStatus,来引导自身进入park。
3、既然需要park,则就实际调用parkAndCheckInterrupt方法来挂起线程:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 这是挂起线程的实现。 return Thread.interrupted(); }
它利用LockSupport.park的方法,来挂起当前的线程。其内部其实是Unsafe.park方法来实现。
4、另外一点需要说明,Thread.interrupted
会清除中断状态,因此acquireQueued是不响应中断的。当线程被中断时,虽然会跳出挂起状态,但在下一次loop循环时,只要没获取到凭证,依旧会被park,即被挂起。如果需要响应中断式的acquire,AQS提供了acquireInterruptibly方法,实现大同小异,此处不再展开。
到这里,线程从进入,然后没获取到凭证进入队列,最后挂起;这整个流程我们都梳理了一下。我们可以看到这个队列同步器的大致样貌:线程进来首先尝试获取凭证,没有获取到则进入队列挂起;同样的,根据waitStatus状态,即便多个线程同时进入情况下也是依次会挂起。这就像路上堵车一般,前面的车停止了,后面来的车一样也要停止等待。但是队列并不能保证最先等待的线程一定最先获取到凭证(有可能刚进入的线程先抢占到了凭证);如果想要完全的FIFO,则你需要自己实现tryAcquire。
Release释放
有获取,便有释放。只有释放了凭证,才能让等待队列中的其他线程得到调度。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
同样的,释放时先调用尝试释放(tryRelease),它也是空实现,需要子类去实现。尝试释放成功之后,则unpark(唤醒)头节点的后序节点:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
先将节点(这里是Head节点)的waitStatus状态更改为0。后面就是判断如果当前的next节点是空,或者waitStatus>0(即为CANCELLED)时,从队尾开始遍历,找到最后一个不为CANCELLED状态的节点。最后,对该节点调用unpark方法来唤醒线程。
唤醒后的线程继续在acquireQueued中的循环执行,尝试获取凭证操作。因此unpark唤醒是按队列先后顺序来的,但是在tryAcquire步骤能否获取到,则”听天由命”了。
稍微总结一下:acquire默认是独占式的,尝试获取不成功则进入队列排队挂起,之后就要等待顺序地唤醒操作。但是队列并不能保证严格顺序性,因为刚进入的线程有可能先被执行(类似于窗口被插队)。AQS的子类默认需要实现tryAcquire和tryRelease操作,真正的获取和释放由子类去控制实现,AQS提供了对获取失败的线程排队,线程挂起和唤醒等底层功能。如何获取凭证、获取等等这些问题是子类需要关心的问题,因此职责交还给子类去负责。
AcquireShared共享获取
上面都是介绍了独占式获取,下面就是共享式的获取,它们有什么区别呢?
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
共享式获取,也是先尝试获取下,tryAcquireShared同样需要子类自己实现。tryAcquireShared会返回一个整数:
- 返回负数。则表示获取失败。
- 返回0。表示共享式地获取成功;但是后续的共享式地获取不能成功。
- 返回正数。表示共享式获取成功,后续的共享式获取也可能成功。
这里我们需要仔细理解为什么tryAcquireShared返回整数,而tryAcquire返回布尔值?只有这样才能方便我们往下面理解。
独占式获取是只允许一个线程获取到凭证而执行,其余都需要等待;而共享式获取则允许一个或多个线程同时获取并执行,因此它返回一个整数来区分多种情况;这也是这两种不同获取方式一个重要的区别。后续的理解都要抓住这个本质。
因此,获取失败之后,进入到真正的共享式获取逻辑:doAcquireShared。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
有了之前分析acquire的逻辑,这里我们就非常容易理解了。节点添加到队列,然后进入循环,判断当前是否是首个实际节点。如果是的话,再次尝试共享式获取,这里需要注意的是,当获取成功(即返回的r>=0)时,同样要重新设置head,同时添加了一个Propagate(传播)功能,那这个传播作用是什么呢?
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
从这段逻辑我们可以看到,当tryAcquireShared返回为正数时(即propagate参数),表明其他线程还可以共享式获取,并且后序节点如果也是共享状态,则可以得到唤醒。这里的doReleaseShared后面将会分析。我们只需要知道传播Propagate就是用来判断后续节点是否也是共享状态,是的话,同样将它唤醒,这也是共享式的含义。
后面的shouldParkAfterFailedAcquire以及parkAndCheckInterrupt就不再多赘述,上面已经做过分析,这里表示:如果共享式资源已经被其他线程获取完了,那其它的线程只能挂起。
并且这个方法对head做了两次空判断,以及对它的waitStatus做了<0的判断,以此来保证传播性。可能这一层的判断,有点不太好理解。这里判断的含义,我们等下文的共享释放理解之后,一起来尝试说明。
ReleaseShared共享释放
共享式获取同样对应有一个共享式释放:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
tryReleaseShared也是一个由子类实现的方法。尝试共享释放(tryReleaseShared)成功后,也会进入doReleaseShared方法。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { // head如果等于tail,说明队列为空 int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
这里判断头节点的waitStatus值,如果为SIGNAL,则尝试唤醒Head节点的后续节点。因为SIGNAL状态代表着下一个节点正在Park挂起状态。然后就是CAS修改头节点状态,如果修改失败,说明当前有另外的线程并发修改Head状态值,于是loop循环需要重新来一遍。
如果当前的状态值为0,则CAS修改状态为PROPAGATE状态。这里的设置和setHeadAndPropagate方法中的h.waitStatus < 0
判断遥相呼应。
我们要清楚一点就是,共享式的方式要比独占式情况复杂一些。因为共享式涉及到了多个线程可能同时acquire,然后可能同时release。因此等待队列中的Head是一个竞争关键资源。所以doReleaseShared中就要利用死循环来重复执行,因为head竞争有可能失败,所以不得不重试,性能上就会有些损耗。
我们再来假设上述的场景,以便理解:线程A和线程B,它们两个获取完了所有的共享凭证并开始执行各自任务,其余进来的线程不得不排队挂起等待。由于线程A和B都是耗时操作,所有的线程挂起之后,队列中的每个节点(tail除外)的waitStatus此时都是SIGNAL。一段时间之后,线程A和线程B同时释放了凭证。在doReleaseShared方法中,它们俩都会竞争Head节点,表现为CAS修改状态。我们再假设,B线程竞争胜利,它成功将waitStatus从SIGNAL状态修改为0,然后唤醒后序节点线程C。与此同时A线程开始loop循环重试。C线程被唤醒之后又尝试获取共享凭证,成功之后进入setHeadAndPropagate方法。好巧不巧,A线程重试之后,将head的waitStatus修改为了PROPAGATE,因此不管此时的C线程有没有执行到setHead(node)
(改变头节点)这一步,传播状态都将延续,因为doReleaseShared依旧可以执行,因为C线程的h.waitStatus < 0
是成立的,如果setHead已经被执行,无非A线程将再次loop循环,这可能会导致有多余的唤醒,这里的解释我们看下作者在源代码中的注释就明白:
private void setHeadAndPropagate(Node node, int propagate) { ... /* * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
共享式获取和释放的逻辑确实是有些难懂理解的,不防多读几遍,结合代码在纸上多演练一下。这也体现了并发编程的难度以及代码阅读性差。同样也告诫我们,对待多并发编程一定要对各种情况进行充分考量,并发情况下很多问题有时候仅仅通过测试是很难复线的。