安利一波组织:itsCoder
转载请附原文链接:ThreadPoolExecutor源码学习笔记
大部分分析以注释形式写在源码中
本篇笔记将从 ThreadPoolExecutor 的一次使用上来分析源码,主要涉及线程池创建,execute 的步骤,任务添加到阻塞队列,线程从阻塞队列中拿取任务执行,线程的回收,线程池的终止。
涉及到的类有
线程池的获取
我们知道可以通过 Executors 来获取不同类型的线程池,那么就从 Executors 来开始看它是如何返回不同类型的线程池的,看看我们常用的一些方法
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
|
从上面的三个方法可以发现其实都是 new 了一个 ThreadPoolExecutor ,但是传入的参数不同,我们进到这个构造方法中去一探究竟,看看不同的参数到底代表了什么
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
通过参数名称以及注释可以知道这几个参数的作用分别是
- corePoolSize — 核心线程数,即允许闲置的线程数目
- maximumPoolSize — 最大线程数,即这个线程池的容量
- keepAliveTime — 非核心线程的闲置存活时间
- unit — 上一个参数的单位
- workQueue — 任务队列(阻塞队列)
- threadFacotry — 线程创建工厂
- handler — 当线程池或者任务队列容量已满时用于 reject
这里要明白一件事情,核心线程只是通过数目来判断,而不是说先创建的线程就是核心线程。
这句话可能有点难懂,我大概解释一下,线程池这个核心线程数的用处就是来判断当前这个闲置线程是否应该回收,那么什么是闲置线程呢?一个线程执行完了一个任务后,会去阻塞队列里面取新的任务,在取到任务之前它就是一个闲置的线程,取任务的方法有两个,一个是一直阻塞直到取出任务,另一个是一定时间内阻塞直到取出任务或者超时,如果超时这个线程就会被回收,我们知道核心线程一般不会被回收。
线程在取任务的时候,线程池会比较当前的有效线程数和允许的核心线程数,如果小于当前的核心线程数则使用第一个方法取任务,也就是没有超时回收,如果大于核心线程数,则使用第二个,一旦超时就回收,所以,并没有绝对的核心线程,只要这个线程出于闲置状态就有被回收的可能。
还有一种情况是设置了线程池允许核心线程超时回收,那么无论线程数有多少,统统会使用第二个方法取任务。
任务的执行
A.状态属性
在看源码之前先了解一下 ThreadPoolExecutor 的几个状态属性,这对后面的源码阅读有很重要的作用,ThreadPoolExecutor 有五种状态
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
|
从上到下依次是
- RUNNING — 运行状态,可以添加新任务,也可以处理阻塞队列中的任务
- SHUTDOWN — 待关闭状态,不再接受新的任务,会继续处理阻塞队列中的任务
- STOP — 停止状态,此时的线程池不处理任何任务
- TIDYING — 整理状态,也可以理解为预终结状态,这个时候任务都处理完毕,池中无有效线程
- TERMINATED — 终止状态
B.execute(Runnable command)
当获取到了一个线程池之后,需要它来执行异步任务,也就是 execute(Runnable) ,传入一个 runnable 对象,在 run 方法中执行我们的代码,那么来看一下 execute() 是怎么工作的,因为源码的注释解释得十分清楚,这里将注释也贴出来。简单翻译一下,当 execute 被调用时总共有三种情况。
- 如果当前的有效线程数小于核心线程数,则试图创建一个新的 worker 线程
- 如果上面一步失败了,则试图将任务添加到阻塞队列中,并且要再一次判断需要不需要回滚队列,或者说创建线程(后面会详细说明)
- 如果上面两步都失败了,则会试图强行创建一个线程来执行这个任务,如果还是失败,扔掉这个任务
了解了这三个步骤,来看看源码,源码中调用了 addworker 方法,这是创建线程的方法,会在后面讲到
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
解释一下第二步,为什么要recheck
当这个任务被添加到了阻塞队列前,池子处于 RUNNING 状态,但如果在添加到队列成功后,池子进入了 SHUTDOWN 状态或者其他状态,这时候是不应该再接收新的任务的,所以需要把这个任务从队列中移除,并且 reject
同样,在没有添加到队列前,可能有一个有效线程,但添加完任务后,这个线程闲置超时或者因为异常被干掉了,这时候需要创建一个新的线程来执行任务
为了更直观的理解一个任务的执行过程,我画了一张图:
C .addWorker()
前一步把 execute 的流程捋了一遍,里面多次出现了 addWorker() 方法,前文说到这是个创建线程的方法,来看看 addWorker 做了些什么,这个方法代码比较长,我们拆开来一点一点看
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } }
|
创建一个Worker(什么东西?下文会讲解,这里把它就当成是一个线程的容器)
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;
|
上面代码从逻辑层面来看不算难懂,到这里一个任务到达后,ThreadPoolExecutor 的处理就结束了,那么任务又是怎么被添加到阻塞队列中,线程是如何从队列中取出任务,上文中的 Worker 又是什么东西?
一个一个来,先来看看 Worker 到底是什么
D.Worker
Worker 是 ThreadPoolExecutor 的一个内部类,实现了 Runnable 接口,继承自 AbstractQueuedSynchronizer,这又是个什么鬼???我也不造~可以看看这篇文章
《Java并发包源码学习之AQS框架(一)概述》
简单来说,Worker实现了 lock 和 unLock 方法来标示当前线程的状态是否为闲置
public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
|
上一节创建线程成功后调用 t.start() 而这个线程又是 Worker 的成员变量
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
|
可以看到这里将 Worker 作为 Runnable 参数创建了一个新的线程,我们知道 Thread 接收一个 Runnable 对象后 start 运行的是 Runnable 的 run 方法,Worker 的 run 方法调用了 runWorker ,这个方法里面就是取出任务执行的逻辑
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
这里弄清楚了一件事情,进入循环准备执行任务时,worker 加锁标记为非闲置,任务执行完毕或者出现异常,worker 释放锁,进入闲置状态。
也就是当一个 worker 执行任务前或者执行完任务,到取出下一个任务期间,都是闲置状态可以被打断
上面取出任务调用了 getTask() ,诶~为什么有一个死循环,别着急,慢慢看来。上面的代码可以知道如果 getTask 返回任务则执行,如果返回为 null 则 worker 需要被回收
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
|
getTask() 方法逻辑也捋得差不多了,这里又出现了两个新的方法,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 和 workQueue.take() ,这两个都是阻塞队列的方法,来看看它们又各自是怎么实现的
E.LinkedBlockingQueue — 阻塞队列
ThreadPoolExecutor 使用的是链表结构的阻塞队列,实现了 BlockingQueue 接口,而 BlockingQueue 则是继承自 Queue 接口,再上层就是 Collection 接口。
因为本篇笔记主要是分析 ThreadPoolExecutor 的原理,所以不会详细介绍 LinkedBlockingQueue 中的其它代码,主要介绍这里所用的方法,首先来看一下上文所提到的 take()
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
上面的代码可以知道 take 方法会一直阻塞直到队列有新的任务为止
接下来是 poll 方法,可以看到几乎与 take 方法相同,唯一的区别是在阻塞的循环代码块里面加了时间判断,如果超时则直接返回为空,不会一直阻塞下去
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
|
线程池的回收及终止
前一节分析了任务的执行流程及原理,也留下了一个问题,worker 是如何被回收的呢?线程池该如何管理呢?回到上一节的 runWorker() 方法中,还记得最后调用了一个方法
processWorkerExit(w, completedAbruptly);
|
这个方法传入了两个参数,第一个是当前的 Woker ,第二个是标记异常退出的标识
首先判断是否为异常退出,如果是异常退出的话需要手动调整线程数量,如果是正常回收的,getTask 方法里面已经手动调整过了,不记得的小伙伴可以看看前文的代码,找找 decrementWorkerCount(),
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
|
上面的代码中调用了 tryTerminate() 方法,这个方法是用于终止线程池的,又是一个 for 循环,从代码结构来看是异常情况的重试机制。还是老方法,慢慢来看总共做了几件事情
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
|
尝试终止线程池的代码分析完了,好像就结束了~但作为好奇宝宝,我们是不是应该看看如何打断闲置线程,以及 terminated 中做了什么呢?来吧,继续装逼
先来看打断线程
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
|
有同学开始装逼了,说我们是好奇宝宝,t.interrupt() 方法也应该看,嗯~没错,但这里是调用了 native 方法,会 c 的可以去看看装逼,我就算了~
好了,再来看看 terminate, 是不是很坑爹? terminated 里面神!马!也!没!干!。。。淡定,其实这个方法类似于 Activity 的生命周期方法,允许你在被终止时做一些事情,默认的线程池没有什么要做的事情,当然什么也没写啦~
* Method invoked when the Executor has terminated. Default * implementation does nothing. Note: To properly nest multiple * overridings, subclasses should generally invoke * {@code super.terminated} within this method. */ protected void terminated() { }
|
异常处理
还记得前面讲到,出现各种异常情况,添加队列失败等等,只是笼统的说了一句扔掉,当然代码实现不可能是简单一句扔掉就完了。回到 execute() 方法中找到 reject() 任务,看看究竟是怎么处理的
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
|
还记得在创建线程池的时候,初始化了一个 handler — RejectedExecutionHandler
这是一个接口,只有一个方法,接收两个参数
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
|
既然是一个接口,那么肯定有他的实现类,我们先不急着看所有实现类,先来看看这里的 handler 可能是什么,记得在使用 Executors 获取线程池调用构造方法的时候并没有传入 handler 参数,那么 ThreadPoolExecutor 应该会有一个默认的 handler
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
|
默认 handler 是 AbortPolicy ,这个类实现了 rejectedExecution() 方法,抛了一个 Runtime 异常,也就是说当任务添加失败,就会抛出异常。这个类在 AsyncTask 引发了一场血案~所以在 API19 以后修改了 AsyncTask 的部分代码逻辑,这里就不细说啦,会在下一篇 AsyncTask 的笔记中分析。
实际上,在 ThreadPoolExecutor 中除了 AbortPolicy 外还实现了三种不同类型的 handler
- CallerRunsPolicy — 在 线程池没有 shutdown 的前提下,会直接在执行 execute 方法的线程里执行这个任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
|
- DiscardPolicy — 啥也不干,默默地丢掉任务~不信你看
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
|
- DiscardOldestPolicy — 丢弃掉队列中未执行的,最老的任务,也就是任务队列排头的任务,然后再试图在执行一次
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
|
总结
其实我不想做任何概念性的总结,原因是我之前没有开始学习源码的时候也看过很多源码分析的文章,大部分文章都会总结一些概念,这些概念本身可能是没有错的,起码是作者自己对源码的理解,但是文字所传达的思想真的是有限的,有时候因为概念的模糊,反而会被带入一个误区,并且长时间的无法转变。
我自己一开始对线程池的理解其实是有偏差的,宏观上可能没有大的问题,但在细节上有很大的误区,通过自己耐心的阅读源码分析后学习到了很多东西。
非要总结的话就给一点我阅读源码的小思路吧:
- 一定要使用过,起码能完整的使用。如果没有用过很难把流程捋清楚
- 从使用的角度作为突破口,一步步的去寻找线索
- 一开始看不需要每一句都弄得很清楚,比如一个方法,应该先搞清楚这个方法里面做了几件事,核心的逻辑是什么
- 在捋清了整体逻辑后,再去看细节上的实现
- 实在无法理解的内容,再看看别人的文章,因为有了源码的基础,再看别人的文章能够有自己的思路
- 与你的好基友探讨,你会发现每个人有不同的角度去理解源码,找到最合适你的那一种
以上是我的一点拙见。
最后感谢我的好基友 — 阿语,在与他的探讨中我走出了误区并有了很多新的理解。