Java线程池的正确姿势
在多核的世界里,并发编程已经是绕不开的话题。实际编写代码时,线程的使用已经变的非常非常普遍。如果单单是创建一个线程,执行一段逻辑,之后让它“自生自灭”,这显然是有问题的。首先,频繁的创建、销毁线程本身就是对资源的消耗;其次,如果创建大量的线程对于操作系统来说,可能“不堪重负”,因为线程之间的上下文切换将消耗大量系统资源;最后一点就是,随时随地创建线程,将导致线程无法得到很好的管理,执行状态和执行结果也将无法得到很好的保证。
上述的种种问题,我们需要一个池子来管理“它们”。在Java语言中,concurrent包下为我们提供了开箱即用的线程池类ThreadPoolExecutor(这里要感谢Doug Lea),它可以帮助我们节约资源,帮助我们管理任务和线程。本文旨在从细节以及原理去了解Java线程池,并且了解合理使用线程池的方式方法。
创建线程池
Java本身就提供了Executors的工厂类来帮助我们快速创建所需要的各种类型线程池:
// 固定个数工作线程的线程池 public static ExecutorService newFixedThreadPool(int nThreads); // 只有单个工作线程的线程池 public static ExecutorService newSingleThreadExecutor(); // 只要需要,可以随时创建线程的线程池 public static ExecutorService newCachedThreadPool() ...
我相信,这些方式是大部分同学创建线程池的用法。但我想说明的一点是,这里我并不推荐大家使用这些工具方法来创建线程池,注意我这里的意思是,这些方法不是不能使用,而是不太推荐。我们应该弄清楚ThreadPoolExecutor的原理,然后自己利用它的参数去构造合理的线程池,甚至继承这个线程池,然后改造它,因为只有你自己最清楚自己的业务需求。即便是这些工具方法,其内部其实也是在构造ThreadPoolExecutor时,利用构造函数做了简单封装而已:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }; public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }; public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
另外,我们自己构造线程池时,应该为线程取上有意义的名字,毕竟在排查问题时,总是比“Thread-1,Thread-2”来的直观些。
构造函数
首先来看下它的构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
里面有几个关键参数,我们依次简单介绍下:
- corePoolSize: 核心线程数。
- maximumPoolSize:最大线程数。
- keepAliveTime:线程空闲生存时间。
- unit:上述时间的时间单位。
- workQueue:任务队列,请注意,这里是个阻塞队列。
- threadFactory:线程工厂,用于创建线程来使用。
- RejectedExecutionHandler:拒绝执行的Handler,任务在某种情况下被拒绝执行,会执行这个Handler。
这里先非常简单说明下,让我们先有个大概印象,否则先入为主详细大篇幅介绍其作用,反而不能很好的理解,后续再慢慢深究各种参数的作用。但是,大家需要格外清楚的一点是,这里的每个参数都至关重要,因为它们直接影响着线程池的运行状态。
执行任务
根据上面构造完线程池,那我们就可以开始执行任务了,那就必须先从execute方法开始(其实submit方法还是调用execute):
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
首先,这里的ctl值要重点说下,这是一个AtomicInterger对象,即一个整数。它的高3位bit用来表示当前线程池的状态,剩下的29个bit用来表示当前工作线程的数量。所以它是把线程池两个关键变量,存入到一个值里面,嗯,学到了!我们来看下它的表示:
private static final int COUNT_BITS = Integer.SIZE - 3; // 即:32-3,COUNT_BITS等于29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程总容量:(0001111111....1)个。 // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; // 高三位 111,由于符号位是1,所以是负数 private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位 000 private static final int STOP = 1 << COUNT_BITS; // 高三位 001 private static final int TIDYING = 2 << COUNT_BITS; // 高三位 010 private static final int TERMINATED = 3 << COUNT_BITS; // 高三位 011 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 默认Running状态 // 或运算 private static int ctlOf(int rs, int wc) { return rs | wc; }
一个值,存了两个变量,那需要的时候怎么提取其中某一个变量呢?肯定要用到简单位运算了,我们可以自己先思考一下。。。
好了,看下源代码:
// 当前的工作线程个数 private static int workerCountOf(int c) { return c & CAPACITY; } // 当前的线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 由此也衍生出了很多判断方法 // 线程池是否running状态 private static boolean isRunning(int c) { return c < SHUTDOWN; } private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; }
很简单,很基础,不做多的解释。有了这些方法,我们下面就可以继续execute方法了。
接下来就是判断当前工作线程数量了,如果还没有超过核心线程数,那就直接创建线程并且运行它,所以线程池总是先把核心线程数启动起来。 AddWorker函数会利用ThreadFactory工厂构造Worker线程,其实函数最核心的就是将ctl值增加1,表示Worker线程数增加了一个,并且启动这个Worker线程执行任务;但是因为涉及到并发问题,所以这个函数内部做了很多判断,比如原子性的CAS方式对ctl值加1,多次校验当前线程池状态,以防并发情况下线程池被修改为非Running状态。不过这些细节可以暂时忽略,这样有利于我们对整体理解。
如果任务提交成功就万事大吉,可以直接return了,如果addWorker没有成功,则要么是线程池核心线程数满了,要么是线程池的状态改变了(非Running状态)。在这里我们先假设核心线程数满了,只厘清这条主线,把线程池状态的改变情况先丢到一边,稍候再讨论。
接下来,任务会被添加到workerQueue(工作队列)中。没错,这个队列就是构造函数的那个阻塞队列。调用它的offer()方法去添加到队列,这个方法在添加成功时会返回true,如果队列满了的话,会返回false。所以我们又可以得出一个结论,当Worker线程数量超过核心线程数时,任务会被加到队列。
下面又分两种情况来讨论:
1、队列已满,添加失败。会再次调用addWorker,尝试添加非核心Worker线程。所以,这里的结论是,当队列满了,才启用非核心Worker线程。同样的,如果当前worker线程总数量(包括核心Worker线程)已经达到最大线程数(maximumPoolSize),或者线程池状态非Running状态,则添加失败,后续就是执行Reject(拒绝)函数了。
-- 第二个参数布尔值表示是否核心线程数。 private boolean addWorker(Runnable firstTask, boolean core); -- 拒绝Handler处理器,这里的handler就是构造函数中的handler final void reject(Runnable command) { handler.rejectedExecution(command, this); }
其实到这里,构造函数中,除了keepAliveTime以外,其余参数我们都涉及到了,并且总结了几条各个参数之间关系的结论。
2、队列未满,添加成功。我们看到,又校验了一遍状态。每一次的分支条件之后,都要对当前状态判断,以防线程池异常状态。所以如果是非Running状态,它用remove函数从队列中移除任务,并执行reject方法。
public boolean remove(Runnable task) { boolean removed = workQueue.remove(task); tryTerminate(); // In case SHUTDOWN and now empty return removed; }
当然,我们还是考虑状态是正常的Running,然后就是判断如果当前Worker线程等于0的话,再调用一次addWorker,并且其中参数Runnable为空,core为false,即非核心线程。这里你可能会有很大疑惑,我明明刚才就是因为核心Worker线程数满了,任务加到队列的。这TM怎么又来个Worker线程数等于0的判断?没关系继续往下:
-- addWorker会利用Runnable构造一个Worker对象。 Worker w = new Worker(firstTask);
此时Runnable是一个空对象,那为什么传入空对象,意义是什么呢?看下Worker类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } }
它自身就是实现了Runnable接口,将自身放入到实际执行任务的Thread对象中,还继承了AbstractQueueSynchronizer类,用来做并发控制和响应中断,对AQS解释可以看之前的文章。因此我们再看看实际执行任务的函数runWorker以及获取任务的getTask函数,当然这里我们把函数经过了一些变形,以便理解:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); try { beforeExecute(wt, task); task.run(); afterExecute(task, thrown); } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } // 该方法从任务队列中获取任务。 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // Are workers subject to culling? int c = ctl.get(); int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
我们看到runTask中,如果当前的任务task为空就会从任务队列Queue中获取。因此传入null的task,会创建一个工作线程,以此来帮助执行正在排队的任务。(While循环会一直获取队列中的任务)。其中还提供了beforeExecute和afterExecute的钩子函数,这两个都是空实现,在任务执行的前后执行想要的逻辑。
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }
在getTask方法中,有一个是否允许超时的判断,允许核心线程超时或者当前线程数超过了核心线程数,默认情况下核心Worker线程是不会被销毁的,但是这个可以动态配置,通过allowCoreThreadTimeOut方法:
public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); } }
因此,如果需要超时,则用poll函数,带上超时时间,这个时间便是构造函数中的keepAliveTime,至此构造函数中所有的参数,我们都明白了它的意义。poll和take都是阻塞性的,take会一直阻塞,直到有一个任务进入线程池。
如果超时还是没有获取到任务,说明队列为空,则会进入下一次循环,并且进入到下面这个判断,将线程数减1,并且退出Worker线程:
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
退出线程池
如果要关闭线程池,线程池提供了两个方法shutdown以及shutDownNow。
Shutdown
首先我们先来看下shutdown :
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
其中关键的一步advanceRunState,将线程池状态修改为SHUTDOWN,修改状态完成之后,根据上述执行任务的execute函数分析可以知道,通过这个方法再想提交任务是会失败,会走最后的reject逻辑。
通过这一步,就阻断了新的任务加入到线程池。然后interruptIdleWorkers用来中断空闲的Worker线程:
private void interruptIdleWorkers(boolean onlyOne) { // false,表示中断所有空闲线程 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
需要注意的是,中断并不是立马中止线程的执行,而是设置一个中断状态。如果该线程处于sleep或者wait等等调用导致的阻塞状态时,将会响应InterruptedException,从而退出阻塞状态。而这里将会使得getTask中阻塞性获取队列任务强行中断。这里大家可以带着思考去getTask代码中,再跟一遍代码,这里直接说结论:
如果获取任务被中断,此时在下一次循环中,将会退出Worker线程,除非任务队列中还有任务。
还需要注意的一点是,设置中断状态的前提是需要获取到当前的Worker的锁,这里会与runWorker的执行过程形成竞争,因此正在执行的Worker线程是不会被设置interrupt状态。
shutDown的最后一步便是tryTerminate(尝试终止、尝试结束):
// 注意一点,该方法在每个Worker线程退出之时,都会调用一遍。 final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
我们带着SHUTDOWN的线程池状态,继续大致走一遍这里的逻辑。
从最开始的if判断里面可以知道:
- 如果线程池处于RUNNING状态,直接不进行后续处理。但这里我们是处于SHUTDOWN状态。
- 如果线程池至少处于了TIDYING(垂死)状态,也不处理。
- 处于SHUTDOWN状态,但是如果当前任务队列不为空的话,直接return,也不进行terminate操作。
所以,我们可以看出,当前队列还有任务的话,是不会终止线程池的。
然后让我们继续沿着代码往下,能走到这一步,说明队列中的任务是空的。接下来判断当前的Worker线程个数是否是0。不为0的话,再次中断一个空闲线程,然后方法退出。如果这一步判断不成立,说明当前的线程池既没有Worker线程,也没有任务。此时,这个池子我们就可以去结束它了。所以接下来就是设置线程池状态——TIDYING(垂死状态),然后调terminated()方法(这是一个空实现函数),函数执行完之后,整个线程池的状态就进入了TERMINATED。
这里有一点需要注意的是最后执行的termination.signalAll(),这里是个信号量的唤醒操作。有什么用呢?有唤醒,就有await操作。我们看下该信号量的调用await的地方:
private final Condition termination = mainLock.newCondition(); public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
awaitTermination这是一个等待线程池关闭的public方法,并且可以指定等待多久,它在一个for循环方法里面判断线程池的状态。为什么是循环呢?因为termination.awaitNanos(nanos)等待指定的时间时,会返回0或者负数表示等待完成,如果返回正数,说明指定的等待时间还有剩余,所以把剩余的时长再次赋值给nanos,下次循环继续等待指定时长。关于该awaitNano具体说明可以参考源码注释。
因此在调用退出的过程中,我们可以通过该方法阻塞式的判断线程池是否关闭完成。
最后,我们对shutDown做个小结:该方法是个”柔性”的停止方法,首先会将线程池变为SHUTDOWN状态,并且阻止任务再进入池子,然后尝试性的停止空闲的Woker线程,正在工作的Woker线程不会做处理,正常运作,并且如果队列中依然存在任务,Worker线程依旧会继续处理队列任务。在未来的某个时间点(不确定性),当队列任务全部被处理完之时,就是线程池TERMINATED之时。
ShutDownNow
接下来,我们再看看shutDownNow关闭线程池方法:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
可以看到,该方法有点类似shutDown,但又不同。表现在修改线程池状态为STOP;为每个Worker线程设置中断状态,即便它正在执行任务。然后利用drainQueue方法,将队列中剩余的任务全部移除,并且放入到tasks集合中,然后作为出参返回出去。
由此看出,STOP状态的线程池,既不接受任务,更加不会处理队列任务。
所以,ShutDownNow是一个”刚性” 的关闭方法,它会中止队列中剩余任务的执行,提前为每个Woker线程设置中断状态,对于正在执行任务的Worker线程时,虽然不受影响,但是在下一次循环getTask时,阻塞式从队列中获取任务便会响应到中断异常,从而退出Worker线程。如果你调用了shutdownNow方法,应当合理处理还没来得及处理的Runnable。
网上有很多文章推荐先调用shutdown,然后用awaitTermination等待某个固定时间,最后再调用shutDownNow来关闭线程池。这种方法相对粗暴的shutDownNow确实要优雅些。如果对于某些不重要的任务,我便可以直接“刚性”关闭,然后将未执行的任务打印或者记录下来,因此具体业务场景具体分析,了解了上述原理,我相信怎么关闭线程池,你自己可以做决定。
总结
最后的最后,我想多啰嗦的是,这篇文章虽然从线程池细节入手,将各种关键方法全面理了一遍,但是最好自己再带着思考阅读几遍源码,从而可以加深自己的理解。我相信,理解了之后你便可以自信地从各种角度,根据业务需求来定制最适合的线程池!