Java线程池的正确姿势

在多核的世界里,并发编程已经是绕不开的话题。实际编写代码时,线程的使用已经变的非常非常普遍。如果单单是创建一个线程,执行一段逻辑,之后让它“自生自灭”,这显然是有问题的。首先,频繁的创建、销毁线程本身就是对资源的消耗;其次,如果创建大量的线程对于操作系统来说,可能“不堪重负”,因为线程之间的上下文切换将消耗大量系统资源;最后一点就是,随时随地创建线程,将导致线程无法得到很好的管理,执行状态和执行结果也将无法得到很好的保证。

上述的种种问题,我们需要一个池子来管理“它们”。在Java语言中,concurrent包下为我们提供了开箱即用的线程池类ThreadPoolExecutor(这里要感谢Doug Lea),它可以帮助我们节约资源,帮助我们管理任务和线程。本文旨在从细节以及原理去了解Java线程池,并且了解合理使用线程池的方式方法。

创建线程池

Java本身就提供了Executors的工厂类来帮助我们快速创建所需要的各种类型线程池:

// 固定个数工作线程的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 只有单个工作线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 只要需要,可以随时创建线程的线程池
public static ExecutorService newCachedThreadPool()
...

我相信,这些方式是大部分同学创建线程池的用法。但我想说明的一点是,这里我并不推荐大家使用这些工具方法来创建线程池,注意我这里的意思是,这些方法不是不能使用,而是不太推荐。我们应该弄清楚ThreadPoolExecutor的原理,然后自己利用它的参数去构造合理的线程池,甚至继承这个线程池,然后改造它,因为只有你自己最清楚自己的业务需求。即便是这些工具方法,其内部其实也是在构造ThreadPoolExecutor时,利用构造函数做了简单封装而已:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    };
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    };
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

另外,我们自己构造线程池时,应该为线程取上有意义的名字,毕竟在排查问题时,总是比“Thread-1,Thread-2”来的直观些。

构造函数

首先来看下它的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

里面有几个关键参数,我们依次简单介绍下:

  • corePoolSize: 核心线程数。
  • maximumPoolSize:最大线程数。
  • keepAliveTime:线程空闲生存时间。
  • unit:上述时间的时间单位。
  • workQueue:任务队列,请注意,这里是个阻塞队列。
  • threadFactory:线程工厂,用于创建线程来使用。
  • RejectedExecutionHandler:拒绝执行的Handler,任务在某种情况下被拒绝执行,会执行这个Handler。

这里先非常简单说明下,让我们先有个大概印象,否则先入为主详细大篇幅介绍其作用,反而不能很好的理解,后续再慢慢深究各种参数的作用。但是,大家需要格外清楚的一点是,这里的每个参数都至关重要,因为它们直接影响着线程池的运行状态。

执行任务

根据上面构造完线程池,那我们就可以开始执行任务了,那就必须先从execute方法开始(其实submit方法还是调用execute):

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

首先,这里的ctl值要重点说下,这是一个AtomicInterger对象,即一个整数。它的高3位bit用来表示当前线程池的状态,剩下的29个bit用来表示当前工作线程的数量。所以它是把线程池两个关键变量,存入到一个值里面,嗯,学到了!我们来看下它的表示:

private static final int COUNT_BITS = Integer.SIZE - 3; // 即:32-3,COUNT_BITS等于29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 线程总容量:(0001111111....1)个。
​
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; // 高三位 111,由于符号位是1,所以是负数
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 高三位 000 
    private static final int STOP       =  1 << COUNT_BITS; // 高三位 001
    private static final int TIDYING    =  2 << COUNT_BITS; // 高三位 010
    private static final int TERMINATED =  3 << COUNT_BITS; // 高三位 011
​
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 默认Running状态
    
    // 或运算 
    private static int ctlOf(int rs, int wc) { 
      return rs | wc; 
    }

一个值,存了两个变量,那需要的时候怎么提取其中某一个变量呢?肯定要用到简单位运算了,我们可以自己先思考一下。。。

好了,看下源代码:

// 当前的工作线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 当前的线程池状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
​
// 由此也衍生出了很多判断方法
​
// 线程池是否running状态
private static boolean isRunning(int c) {
   return c < SHUTDOWN;
}
​
private static boolean runStateLessThan(int c, int s) {
   return c < s;
}
​
private static boolean runStateAtLeast(int c, int s) {
  return c >= s;
}
​

很简单,很基础,不做多的解释。有了这些方法,我们下面就可以继续execute方法了。

接下来就是判断当前工作线程数量了,如果还没有超过核心线程数,那就直接创建线程并且运行它,所以线程池总是先把核心线程数启动起来。 AddWorker函数会利用ThreadFactory工厂构造Worker线程,其实函数最核心的就是将ctl值增加1,表示Worker线程数增加了一个,并且启动这个Worker线程执行任务;但是因为涉及到并发问题,所以这个函数内部做了很多判断,比如原子性的CAS方式对ctl值加1,多次校验当前线程池状态,以防并发情况下线程池被修改为非Running状态。不过这些细节可以暂时忽略,这样有利于我们对整体理解。

如果任务提交成功就万事大吉,可以直接return了,如果addWorker没有成功,则要么是线程池核心线程数满了,要么是线程池的状态改变了(非Running状态)。在这里我们先假设核心线程数满了,只厘清这条主线,把线程池状态的改变情况先丢到一边,稍候再讨论。

接下来,任务会被添加到workerQueue(工作队列)中。没错,这个队列就是构造函数的那个阻塞队列。调用它的offer()方法去添加到队列,这个方法在添加成功时会返回true,如果队列满了的话,会返回false。所以我们又可以得出一个结论,当Worker线程数量超过核心线程数时,任务会被加到队列。

下面又分两种情况来讨论:

1、队列已满,添加失败。会再次调用addWorker,尝试添加非核心Worker线程。所以,这里的结论是,当队列满了,才启用非核心Worker线程。同样的,如果当前worker线程总数量(包括核心Worker线程)已经达到最大线程数(maximumPoolSize),或者线程池状态非Running状态,则添加失败,后续就是执行Reject(拒绝)函数了。

-- 第二个参数布尔值表示是否核心线程数。
private boolean addWorker(Runnable firstTask, boolean core);
-- 拒绝Handler处理器,这里的handler就是构造函数中的handler  
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

其实到这里,构造函数中,除了keepAliveTime以外,其余参数我们都涉及到了,并且总结了几条各个参数之间关系的结论。

2、队列未满,添加成功。我们看到,又校验了一遍状态。每一次的分支条件之后,都要对当前状态判断,以防线程池异常状态。所以如果是非Running状态,它用remove函数从队列中移除任务,并执行reject方法。

public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;
    }

当然,我们还是考虑状态是正常的Running,然后就是判断如果当前Worker线程等于0的话,再调用一次addWorker,并且其中参数Runnable为空,core为false,即非核心线程。这里你可能会有很大疑惑,我明明刚才就是因为核心Worker线程数满了,任务加到队列的。这TM怎么又来个Worker线程数等于0的判断?没关系继续往下:

-- addWorker会利用Runnable构造一个Worker对象。
Worker w = new Worker(firstTask);

此时Runnable是一个空对象,那为什么传入空对象,意义是什么呢?看下Worker类:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
  
   Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
  }
}  

它自身就是实现了Runnable接口,将自身放入到实际执行任务的Thread对象中,还继承了AbstractQueueSynchronizer类,用来做并发控制和响应中断,对AQS解释可以看之前的文章。因此我们再看看实际执行任务的函数runWorker以及获取任务的getTask函数,当然这里我们把函数经过了一些变形,以便理解:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
               w.lock();
                try {
                    beforeExecute(wt, task);
                    task.run();
                    afterExecute(task, thrown);
                } finally {
                    task = null;
                    w.completedTasks++;
                   w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
   }

// 该方法从任务队列中获取任务。
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
           // Are workers subject to culling?
           int c = ctl.get();
            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
           if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

我们看到runTask中,如果当前的任务task为空就会从任务队列Queue中获取。因此传入null的task,会创建一个工作线程,以此来帮助执行正在排队的任务。(While循环会一直获取队列中的任务)。其中还提供了beforeExecuteafterExecute的钩子函数,这两个都是空实现,在任务执行的前后执行想要的逻辑。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

在getTask方法中,有一个是否允许超时的判断,允许核心线程超时或者当前线程数超过了核心线程数,默认情况下核心Worker线程是不会被销毁的,但是这个可以动态配置,通过allowCoreThreadTimeOut方法

public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value)
                interruptIdleWorkers();
        }
    }

因此,如果需要超时,则用poll函数,带上超时时间,这个时间便是构造函数中的keepAliveTime,至此构造函数中所有的参数,我们都明白了它的意义。polltake都是阻塞性的,take会一直阻塞,直到有一个任务进入线程池。

如果超时还是没有获取到任务,说明队列为空,则会进入下一次循环,并且进入到下面这个判断,将线程数减1,并且退出Worker线程:

if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
}

退出线程池

如果要关闭线程池,线程池提供了两个方法shutdown以及shutDownNow

Shutdown

首先我们先来看下shutdown :

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

其中关键的一步advanceRunState,将线程池状态修改为SHUTDOWN,修改状态完成之后,根据上述执行任务的execute函数分析可以知道,通过这个方法再想提交任务是会失败,会走最后的reject逻辑。

通过这一步,就阻断了新的任务加入到线程池。然后interruptIdleWorkers用来中断空闲的Worker线程:

private void interruptIdleWorkers(boolean onlyOne) { // false,表示中断所有空闲线程
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

需要注意的是,中断并不是立马中止线程的执行,而是设置一个中断状态。如果该线程处于sleep或者wait等等调用导致的阻塞状态时,将会响应InterruptedException,从而退出阻塞状态。而这里将会使得getTask中阻塞性获取队列任务强行中断。这里大家可以带着思考去getTask代码中,再跟一遍代码,这里直接说结论:

如果获取任务被中断,此时在下一次循环中,将会退出Worker线程,除非任务队列中还有任务

还需要注意的一点是,设置中断状态的前提是需要获取到当前的Worker的锁,这里会与runWorker的执行过程形成竞争,因此正在执行的Worker线程是不会被设置interrupt状态。

shutDown的最后一步便是tryTerminate(尝试终止、尝试结束)

// 注意一点,该方法在每个Worker线程退出之时,都会调用一遍。
final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

我们带着SHUTDOWN的线程池状态,继续大致走一遍这里的逻辑。

从最开始的if判断里面可以知道:

  1. 如果线程池处于RUNNING状态,直接不进行后续处理。但这里我们是处于SHUTDOWN状态。
  2. 如果线程池至少处于了TIDYING(垂死)状态,也不处理。
  3. 处于SHUTDOWN状态,但是如果当前任务队列不为空的话,直接return,也不进行terminate操作。

所以,我们可以看出,当前队列还有任务的话,是不会终止线程池的。

然后让我们继续沿着代码往下,能走到这一步,说明队列中的任务是空的。接下来判断当前的Worker线程个数是否是0。不为0的话,再次中断一个空闲线程,然后方法退出。如果这一步判断不成立,说明当前的线程池既没有Worker线程,也没有任务。此时,这个池子我们就可以去结束它了。所以接下来就是设置线程池状态——TIDYING(垂死状态),然后调terminated()方法(这是一个空实现函数),函数执行完之后,整个线程池的状态就进入了TERMINATED。

这里有一点需要注意的是最后执行的termination.signalAll(),这里是个信号量的唤醒操作。有什么用呢?有唤醒,就有await操作。我们看下该信号量的调用await的地方:

private final Condition termination = mainLock.newCondition();

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

awaitTermination这是一个等待线程池关闭的public方法,并且可以指定等待多久,它在一个for循环方法里面判断线程池的状态。为什么是循环呢?因为termination.awaitNanos(nanos)等待指定的时间时,会返回0或者负数表示等待完成,如果返回正数,说明指定的等待时间还有剩余,所以把剩余的时长再次赋值给nanos,下次循环继续等待指定时长。关于该awaitNano具体说明可以参考源码注释。

因此在调用退出的过程中,我们可以通过该方法阻塞式的判断线程池是否关闭完成。

最后,我们对shutDown做个小结:该方法是个”柔性”的停止方法,首先会将线程池变为SHUTDOWN状态,并且阻止任务再进入池子,然后尝试性的停止空闲的Woker线程,正在工作的Woker线程不会做处理,正常运作,并且如果队列中依然存在任务,Worker线程依旧会继续处理队列任务。在未来的某个时间点(不确定性),当队列任务全部被处理完之时,就是线程池TERMINATED之时。

ShutDownNow

接下来,我们再看看shutDownNow关闭线程池方法:

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

可以看到,该方法有点类似shutDown,但又不同。表现在修改线程池状态为STOP;为每个Worker线程设置中断状态,即便它正在执行任务。然后利用drainQueue方法,将队列中剩余的任务全部移除,并且放入到tasks集合中,然后作为出参返回出去。

由此看出,STOP状态的线程池,既不接受任务,更加不会处理队列任务。

所以,ShutDownNow是一个”刚性” 的关闭方法,它会中止队列中剩余任务的执行,提前为每个Woker线程设置中断状态,对于正在执行任务的Worker线程时,虽然不受影响,但是在下一次循环getTask时,阻塞式从队列中获取任务便会响应到中断异常,从而退出Worker线程。如果你调用了shutdownNow方法,应当合理处理还没来得及处理的Runnable。

网上有很多文章推荐先调用shutdown,然后用awaitTermination等待某个固定时间,最后再调用shutDownNow来关闭线程池。这种方法相对粗暴的shutDownNow确实要优雅些。如果对于某些不重要的任务,我便可以直接“刚性”关闭,然后将未执行的任务打印或者记录下来,因此具体业务场景具体分析,了解了上述原理,我相信怎么关闭线程池,你自己可以做决定。

总结

最后的最后,我想多啰嗦的是,这篇文章虽然从线程池细节入手,将各种关键方法全面理了一遍,但是最好自己再带着思考阅读几遍源码,从而可以加深自己的理解。我相信,理解了之后你便可以自信地从各种角度,根据业务需求来定制最适合的线程池!