队列同步器之AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是一款抽象的队列同步器,简称AQS。在Java并发包下许多工具类(如:ReentrantLockCountDownLatchThreadPoolExecutorSemaphoreReentrantReadWriteLock)都间接使用了该同步器来实现各自的功能。可以说,了解了这个同步器,就了解了并发包下的这些工具类的核心思想。本文通过一些展示AbstractQueuedSynchronizer的一些核心方法,以此来揭示它的设计思想。我相信了解了队列同步器,你便可以很轻松了解上述的一些工具类实现原理,并且可以基于这个同步器来实现定制自己的业务需求。

核心变量

首先我们看下它的一些核心的变量定义:

// 队列头
 private transient volatile Node head;
// 队列尾
 private transient volatile Node tail;
// 同步状态
 private volatile int state;
​
// Node节点的定义
static final class Node {
// 共享模式。
  static final Node SHARED = new Node();
  // 独占模式。就是一个NULL节点  
  static final Node EXCLUSIVE = null;
  // 各种等待状态定义,暂时了解一下,后面再具体分析
  static final int CANCELLED =  1;     
  static final int SIGNAL    = -1;
  static final int CONDITION = -2;
  static final int PROPAGATE = -3;
  // 当前Node的等待状态,上面定义的某一个值之一或者是0,默认初始值即为0。
  volatile int waitStatus;
  // 前序节点以及后序节点,熟悉双向链表应该就不难理解了。
  volatile Node prev;
  volatile Node next;
  // Node代表的线程,这个是Node包装的真实内容。
  volatile Thread thread;
  // 下一个等待者,后面再详细分析
  Node nextWaiter;
}

既然是同步队列,因此有队列头,队列尾的值,很好理解。值得重点关注的是这里的state,它是个非常关键的变量,当前同步器的状态,以及各个线程之间的竞争关系都是通过对该值CAS式的修改来实现的。再看下对这些值的修改:

 private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
​
        } catch (Exception ex) { throw new Error(ex); }
    }
    // CAS设置头节点
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    // CAS设置尾节点
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
    // CAS设置下一个节点
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
    // CAS设置状态值
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

首先在static静态代码块里面,通过Unsafe类拿到这些关键变量的在内存中的位移地址。Unsafe顾名思义是不安全的,它主要用来操作底层的内存资源等。这里不展开说Unsafe类,暂时只需要做个简单理解,有兴趣同学可以网上查阅相关资料。总之,拿到了内存的位移地址,就可以知道这些变量在内存中的地址,为后续CAS操作这些变量提供了基础。

然后就是campareAndSetXXX的方法定义了,封装了Unsafe的CAS方法,来原子性的修改这些关键变量。

核心方法

有了对上述核心变量的知识,我们看下它的一些核心方法。

Acquire获取

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 节点以独占式状态,加入队列。
      selfInterrupt();
}
​
// 尝试获取
protected boolean tryAcquire(int arg) {
      throw new UnsupportedOperationException();
}

核心方法acquire,单词简单翻译过来就是获取、取得。结合同步器的语境,我把它理解为获取通行证或者取得凭证。方法的int参数可以理解为凭证的个数。

方法进来会调用尝试获取(tryAcquire),这个方法并没有任何实现,它交由子类去实现。该方法返回布尔值,表示尝试获取成功还是失败。如果尝试获取成功,则acquire直接返回;获取失败返回false,则进入后续的acquireQueued方法。

private Node addWaiter(Node mode) {
    // 初始化,包装了当前的线程。
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) { // 如果尾节点不为空,则尝试快速添加一次节点。失败了,再调用后面的enq入队列方法。
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
      }
    }
    // Node加入到队列
    enq(node);
    return node;
}    
​
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
 }

tryAcquire方法失败了,则将节点加入到队列中。其中,addWaiter方法就是入队列操作,如果tail节点不为空,说明队列中已经有了数据,则快速尝试CAS加一次队列,将该节点加入到队尾,原有的tail的next指向当前节点。如果竞争不激烈情况下,这里始终可以成功,因此提高了性能;竞争激烈的情况下,这里还是回退到调用enq方法,以死循环方式,不停尝试向队尾添加node对象。

添加队列成功以后,再调用acquireQueued方法,字面意思是,让已经排队的节点获取凭证。具体看下它的实现:

1、判断当前Node的前序节点是否是head节点。由于head节点一般是个伪节点(DummyHead),它并不实际存储值,因此如果判断成立,则当前节点就是第一个实际存储值的节点,然后再次尝试获取(tryAcquire)下,获取成功则直接返回,然后头节点再次配置成伪节点。

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

2、之后的判断便是shouldParkAfterFailedAcquire,字面理解就是它的实际含义:获取失败之后,线程是否需要挂起呢?

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

代码中看出,这里就需要通过前序节点的状态(waitStatus)来判断。上述的Node状态终于在这里露出了点面目。如果前序节点是在SIGNAL状态,则返回true,否则其他状态返回false,即不需要挂起。为了方便理解,我们想象一种场景:最开始线程A进入时,acquire成功,由于这个线程执行任务比较耗时,导致后续的线程进入时,全都需要进入队列排队;因此当第一个排队线程进入时,这里的前序节点其实就是head节点,头节点的waitStatus初始是0,根据判断流程来到:compareAndSetWaitStatus(pred, ws, Node.SIGNAL),将头节点等待状态设置为SIGNAL,最后返回false。然而由于我们的A线程还在执行任务,acquireQueued来到下一次循环,很显然这次循环下的shouldParkAfterFailedAcquire返回了true,表示需要park。由此可见,它是通过修改前序节点的waitStatus,来引导自身进入park。

3、既然需要park,则就实际调用parkAndCheckInterrupt方法来挂起线程:

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 这是挂起线程的实现。
        return Thread.interrupted();
    }

它利用LockSupport.park的方法,来挂起当前的线程。其内部其实是Unsafe.park方法来实现。

4、另外一点需要说明,Thread.interrupted会清除中断状态,因此acquireQueued是不响应中断的。当线程被中断时,虽然会跳出挂起状态,但在下一次loop循环时,只要没获取到凭证,依旧会被park,即被挂起。如果需要响应中断式的acquire,AQS提供了acquireInterruptibly方法,实现大同小异,此处不再展开。

到这里,线程从进入,然后没获取到凭证进入队列,最后挂起;这整个流程我们都梳理了一下。我们可以看到这个队列同步器的大致样貌:线程进来首先尝试获取凭证,没有获取到则进入队列挂起;同样的,根据waitStatus状态,即便多个线程同时进入情况下也是依次会挂起。这就像路上堵车一般,前面的车停止了,后面来的车一样也要停止等待。但是队列并不能保证最先等待的线程一定最先获取到凭证(有可能刚进入的线程先抢占到了凭证);如果想要完全的FIFO,则你需要自己实现tryAcquire

Release释放

有获取,便有释放。只有释放了凭证,才能让等待队列中的其他线程得到调度。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
  }
​
protected boolean tryRelease(int arg) {
       throw new UnsupportedOperationException();
 }

同样的,释放时先调用尝试释放(tryRelease),它也是空实现,需要子类去实现。尝试释放成功之后,则unpark(唤醒)头节点的后序节点:

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
​
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

先将节点(这里是Head节点)的waitStatus状态更改为0。后面就是判断如果当前的next节点是空,或者waitStatus>0(即为CANCELLED)时,从队尾开始遍历,找到最后一个不为CANCELLED状态的节点。最后,对该节点调用unpark方法来唤醒线程。

唤醒后的线程继续在acquireQueued中的循环执行,尝试获取凭证操作。因此unpark唤醒是按队列先后顺序来的,但是在tryAcquire步骤能否获取到,则”听天由命”了。

稍微总结一下:acquire默认是独占式的,尝试获取不成功则进入队列排队挂起,之后就要等待顺序地唤醒操作。但是队列并不能保证严格顺序性,因为刚进入的线程有可能先被执行(类似于窗口被插队)。AQS的子类默认需要实现tryAcquire和tryRelease操作,真正的获取和释放由子类去控制实现,AQS提供了对获取失败的线程排队,线程挂起和唤醒等底层功能。如何获取凭证、获取等等这些问题是子类需要关心的问题,因此职责交还给子类去负责。

AcquireShared共享获取

上面都是介绍了独占式获取,下面就是共享式的获取,它们有什么区别呢?

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
protected int tryAcquireShared(int arg) {
       throw new UnsupportedOperationException();
  }

共享式获取,也是先尝试获取下,tryAcquireShared同样需要子类自己实现。tryAcquireShared会返回一个整数:

  1. 返回负数。则表示获取失败。
  2. 返回0。表示共享式地获取成功;但是后续的共享式地获取不能成功。
  3. 返回正数。表示共享式获取成功,后续的共享式获取也可能成功。

这里我们需要仔细理解为什么tryAcquireShared返回整数,而tryAcquire返回布尔值?只有这样才能方便我们往下面理解。

独占式获取是只允许一个线程获取到凭证而执行,其余都需要等待;而共享式获取则允许一个或多个线程同时获取并执行,因此它返回一个整数来区分多种情况;这也是这两种不同获取方式一个重要的区别。后续的理解都要抓住这个本质。

因此,获取失败之后,进入到真正的共享式获取逻辑:doAcquireShared。

 private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

有了之前分析acquire的逻辑,这里我们就非常容易理解了。节点添加到队列,然后进入循环,判断当前是否是首个实际节点。如果是的话,再次尝试共享式获取,这里需要注意的是,当获取成功(即返回的r>=0)时,同样要重新设置head,同时添加了一个Propagate(传播)功能,那这个传播作用是什么呢?

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

从这段逻辑我们可以看到,当tryAcquireShared返回为正数时(即propagate参数),表明其他线程还可以共享式获取,并且后序节点如果也是共享状态,则可以得到唤醒。这里的doReleaseShared后面将会分析。我们只需要知道传播Propagate就是用来判断后续节点是否也是共享状态,是的话,同样将它唤醒,这也是共享式的含义。

后面的shouldParkAfterFailedAcquire以及parkAndCheckInterrupt就不再多赘述,上面已经做过分析,这里表示:如果共享式资源已经被其他线程获取完了,那其它的线程只能挂起。

并且这个方法对head做了两次空判断,以及对它的waitStatus做了<0的判断,以此来保证传播性。可能这一层的判断,有点不太好理解。这里判断的含义,我们等下文的共享释放理解之后,一起来尝试说明。

ReleaseShared共享释放

共享式获取同样对应有一个共享式释放:

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
​
protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

tryReleaseShared也是一个由子类实现的方法。尝试共享释放(tryReleaseShared)成功后,也会进入doReleaseShared方法。

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { // head如果等于tail,说明队列为空
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

这里判断头节点的waitStatus值,如果为SIGNAL,则尝试唤醒Head节点的后续节点。因为SIGNAL状态代表着下一个节点正在Park挂起状态。然后就是CAS修改头节点状态,如果修改失败,说明当前有另外的线程并发修改Head状态值,于是loop循环需要重新来一遍。

如果当前的状态值为0,则CAS修改状态为PROPAGATE状态。这里的设置和setHeadAndPropagate方法中的h.waitStatus < 0判断遥相呼应。

我们要清楚一点就是,共享式的方式要比独占式情况复杂一些。因为共享式涉及到了多个线程可能同时acquire,然后可能同时release。因此等待队列中的Head是一个竞争关键资源。所以doReleaseShared中就要利用死循环来重复执行,因为head竞争有可能失败,所以不得不重试,性能上就会有些损耗。

我们再来假设上述的场景,以便理解:线程A和线程B,它们两个获取完了所有的共享凭证并开始执行各自任务,其余进来的线程不得不排队挂起等待。由于线程A和B都是耗时操作,所有的线程挂起之后,队列中的每个节点(tail除外)的waitStatus此时都是SIGNAL。一段时间之后,线程A和线程B同时释放了凭证。在doReleaseShared方法中,它们俩都会竞争Head节点,表现为CAS修改状态。我们再假设,B线程竞争胜利,它成功将waitStatus从SIGNAL状态修改为0,然后唤醒后序节点线程C。与此同时A线程开始loop循环重试。C线程被唤醒之后又尝试获取共享凭证,成功之后进入setHeadAndPropagate方法。好巧不巧,A线程重试之后,将head的waitStatus修改为了PROPAGATE,因此不管此时的C线程有没有执行到setHead(node)(改变头节点)这一步,传播状态都将延续,因为doReleaseShared依旧可以执行,因为C线程的h.waitStatus < 0是成立的,如果setHead已经被执行,无非A线程将再次loop循环,这可能会导致有多余的唤醒,这里的解释我们看下作者在源代码中的注释就明白:

private void setHeadAndPropagate(Node node, int propagate) {
        ...
          
        /*
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

共享式获取和释放的逻辑确实是有些难懂理解的,不防多读几遍,结合代码在纸上多演练一下。这也体现了并发编程的难度以及代码阅读性差。同样也告诫我们,对待多并发编程一定要对各种情况进行充分考量,并发情况下很多问题有时候仅仅通过测试是很难复线的