ThreadPoolExecutor 源码阅读
- 转载:www.GuoXiongfei.cn
- /
- 时间:2019-02-12 09:01:23
- /
- 浏览:93,942次
- /
- 分类:css ,html ,javascript ,Jquery ,前端开发 ,本站主题
目录ThreadPoolExecutor源码阅读Executor框架ExecutorExecutorServiceAbstractExecutorService构造器Executors创建线程池状态Worker与任务调度提交任务线程池关闭ThreadPoolExecutor源码阅读读了一下ThreadPoolExecutor的源码(JDK11),简单的做个笔记.Executor框架Executor...
目录
- ThreadPoolExecutor 源码阅读
- Executor 框架
- Executor
- ExecutorService
- AbstractExecutorService
- 构造器
- Executors创建线程池
- 状态
- Worker 与任务调度
- 提交任务
- 线程池关闭
- Executor 框架
ThreadPoolExecutor 源码阅读
读了一下 ThreadPoolExecutor 的源码(JDK 11), 简单的做个笔记.
Executor 框架
Executor
Executor
接口只有一个方法:
public interface Executor { void execute(Runnable command);}
Executor
接口提供了一种将任务提交和任务执行机制解耦的方法. Executor
的实现并不须要是异步的.
ExecutorService
ExecutorService
在 Executor
的基础上, 提供了一些管理终止的方法和可以生成 Future
来跟踪一个或多个异步任务的进度的方法:
shutdown()
方法会启动比较柔和的关闭过程, 并且不会阻塞.ExecutorService
将会继续执行已经提交的任务, 但不会再接受新的任务. 如果ExecutorService
已经被关闭, 则不会有附加的操作.shutdownNow()
方法会尝试停止正在执行的任务, 不再执行等待执行的任务, 并且返回等待执行的任务列表, 不会阻塞. 这个方法只能尝试停止任务, 典型的取消实现是通过中断来取消任务, 因此不能响应中断的任务可能永远不会终止.invokeAll()
方法执行给定集合中的所有任务, 当所有任务完成时返回Future
的列表, 支持中断. 如果在此操作正在进行时修改了给定的集合,则此方法的结果未定义.invokeAny()
方法会执行给定集合中的任务, 当有一个任务完成时, 返回这个任务的结果, 并取消其他未完成的任务, 支持中断. 如果在此操作正在进行时修改了给定的集合,则此方法的结果未定义.
AbstractExecutorService
AbstractExecutorService
提供了一些 ExecutorService
的执行方法的默认实现. 这个方法使用了 newTaskFor()
方法返回的 RunnableFuture
(默认是 FutureTask
) 来实现 submit()
、invokeAll()
、 invokeAny()
方法.
RunnableFuture
继承了 Runnable
和 Future
, 在 run()
方法成功执行后, 将会设置完成状态, 并允许获取执行的结果:
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run();}
FutureTask
FutureTask
实现了 RunnableFuture
接口, 表示一个可取消的计算任务, 只能在任务完成之后获取结果, 并且在任务完成后, 就不再能取消或重启, 除非使用 runAndReset()
方法.
FutureTask
有 7 个状态:
- NEW
- COMPLETING
- NORMAL
- EXCEPTIONAL
- CANCELLED
- INTERRUPTING
- INTERRUPTED
可能的状态转换:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
FutureTask
在更新 state 、 runner、 waiters 时, 都使用了 VarHandle.compareAndSet()
:
// VarHandle mechanicsprivate static final VarHandle STATE;private static final VarHandle RUNNER;private static final VarHandle WAITERS;static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, “state“, int.class); RUNNER = l.findVarHandle(FutureTask.class, “runner“, Thread.class); WAITERS = l.findVarHandle(FutureTask.class, “waiters“, WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class;}protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); }}
来看一下 get()
方法:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s);}private int awaitDone(boolean timed, long nanos) throws InterruptedException { long startTime = 0L; WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) {// 已经在终结状态, 返回状态if (q != null) q.thread = null;return s; } else if (s == COMPLETING)// 已经完成了, 但是状态还是 COMPLETINGThread.yield(); else if (Thread.interrupted()) {// 检查中断removeWaiter(q);throw new InterruptedException(); } else if (q == null) {// 没有创建 WaitNode 节点, 如果 timed 并且 nanos 大于 0, 创建一个 WaitNodeif (timed && nanos <= 0L) return s;q = new WaitNode(); } else if (!queued)// 将新的 WaitNode 放到链表头部, 并尝试 cas 到 waitersqueued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) {final long parkNanos;if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos;} else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { // 超时了 removeWaiter(q); return state; } // park 的时间 parkNanos = nanos - elapsed;}// nanos 比较慢, 再次检查, 然后阻塞if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else// 不需要超时的阻塞LockSupport.park(this); }}
再来看下 run()
方法:
public void run() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) // 不在 NEW 状态, 或者 runner 不为 null return; try { // callable 是在构造器中指定的或用 Executors.callable(runnable, result) 创建的 Callable<V> c = callable; if (c != null && state == NEW) {V result;boolean ran;try { result = c.call(); ran = true;} catch (Throwable ex) { result = null; ran = false; // 设置异常状态和异常结果 setException(ex);}if (ran) // 正常完成, 设置完成状态和结果 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s); }}protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); }}private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) {// cas 移除 waiters, 对链表中的每个 Node 的线程 un