ThreadPoolExecutor 源码阅读

目录ThreadPoolExecutor源码阅读Executor框架ExecutorExecutorServiceAbstractExecutorService构造器Executors创建线程池状态Worker与任务调度提交任务线程池关闭ThreadPoolExecutor源码阅读读了一下ThreadPoolExecutor的源码(JDK11),简单的做个笔记.Executor框架Executor...

ThreadPoolExecutor 源码阅读

目录

  • ThreadPoolExecutor 源码阅读
    • Executor 框架
      • Executor
      • ExecutorService
      • AbstractExecutorService
    • 构造器
      • Executors创建线程池
    • 状态
    • Worker 与任务调度
    • 提交任务
    • 线程池关闭

ThreadPoolExecutor 源码阅读

读了一下 ThreadPoolExecutor 的源码(JDK 11), 简单的做个笔记.

Executor 框架

Executor

Executor 接口只有一个方法:

public interface Executor { void execute(Runnable command);}

Executor 接口提供了一种将任务提交和任务执行机制解耦的方法. Executor 的实现并不须要是异步的.

ExecutorService

ExecutorServiceExecutor 的基础上, 提供了一些管理终止的方法和可以生成 Future 来跟踪一个或多个异步任务的进度的方法:

  • shutdown() 方法会启动比较柔和的关闭过程, 并且不会阻塞. ExecutorService 将会继续执行已经提交的任务, 但不会再接受新的任务. 如果 ExecutorService 已经被关闭, 则不会有附加的操作.
  • shutdownNow() 方法会尝试停止正在执行的任务, 不再执行等待执行的任务, 并且返回等待执行的任务列表, 不会阻塞. 这个方法只能尝试停止任务, 典型的取消实现是通过中断来取消任务, 因此不能响应中断的任务可能永远不会终止.
  • invokeAll() 方法执行给定集合中的所有任务, 当所有任务完成时返回 Future 的列表, 支持中断. 如果在此操作正在进行时修改了给定的集合,则此方法的结果未定义.
  • invokeAny() 方法会执行给定集合中的任务, 当有一个任务完成时, 返回这个任务的结果, 并取消其他未完成的任务, 支持中断. 如果在此操作正在进行时修改了给定的集合,则此方法的结果未定义.

AbstractExecutorService

AbstractExecutorService 提供了一些 ExecutorService 的执行方法的默认实现. 这个方法使用了 newTaskFor() 方法返回的 RunnableFuture (默认是 FutureTask ) 来实现 submit()invokeAll()invokeAny() 方法.

RunnableFuture 继承了 RunnableFuture , 在 run() 方法成功执行后, 将会设置完成状态, 并允许获取执行的结果:

public interface RunnableFuture<V> extends Runnable, Future<V> { /**  * Sets this Future to the result of its computation  * unless it has been cancelled.  */ void run();}

FutureTask

FutureTask 实现了 RunnableFuture 接口, 表示一个可取消的计算任务, 只能在任务完成之后获取结果, 并且在任务完成后, 就不再能取消或重启, 除非使用 runAndReset() 方法.

FutureTask 有 7 个状态:

  • NEW
  • COMPLETING
  • NORMAL
  • EXCEPTIONAL
  • CANCELLED
  • INTERRUPTING
  • INTERRUPTED

可能的状态转换:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

FutureTask 在更新 state 、 runner、 waiters 时, 都使用了 VarHandle.compareAndSet() :

// VarHandle mechanicsprivate static final VarHandle STATE;private static final VarHandle RUNNER;private static final VarHandle WAITERS;static { try {  MethodHandles.Lookup l = MethodHandles.lookup();  STATE = l.findVarHandle(FutureTask.class, “state“, int.class);  RUNNER = l.findVarHandle(FutureTask.class, “runner“, Thread.class);  WAITERS = l.findVarHandle(FutureTask.class, “waiters“, WaitNode.class); } catch (ReflectiveOperationException e) {  throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class;}protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) {  outcome = v;  STATE.setRelease(this, NORMAL); // final state  finishCompletion(); }}

来看一下 get() 方法:

public V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException { if (unit == null)  throw new NullPointerException(); int s = state; if (s <= COMPLETING &&  (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)  throw new TimeoutException(); return report(s);}private int awaitDone(boolean timed, long nanos) throws InterruptedException { long startTime = 0L;  WaitNode q = null; boolean queued = false; for (;;) {  int s = state;  if (s > COMPLETING) {// 已经在终结状态, 返回状态if (q != null) q.thread = null;return s;  }  else if (s == COMPLETING)// 已经完成了, 但是状态还是 COMPLETINGThread.yield();  else if (Thread.interrupted()) {// 检查中断removeWaiter(q);throw new InterruptedException();  }  else if (q == null) {// 没有创建 WaitNode 节点, 如果 timed 并且 nanos 大于 0, 创建一个 WaitNodeif (timed && nanos <= 0L) return s;q = new WaitNode();  }  else if (!queued)// 将新的 WaitNode 放到链表头部, 并尝试 cas 到 waitersqueued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);  else if (timed) {final long parkNanos;if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L)  startTime = 1L; parkNanos = nanos;} else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) {  // 超时了  removeWaiter(q);  return state; } // park 的时间 parkNanos = nanos - elapsed;}// nanos 比较慢, 再次检查, 然后阻塞if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos);  }  else// 不需要超时的阻塞LockSupport.park(this); }}

再来看下 run() 方法:

public void run() { if (state != NEW ||  !RUNNER.compareAndSet(this, null, Thread.currentThread()))  // 不在 NEW 状态, 或者 runner 不为 null  return; try {  // callable 是在构造器中指定的或用 Executors.callable(runnable, result) 创建的  Callable<V> c = callable;  if (c != null && state == NEW) {V result;boolean ran;try { result = c.call(); ran = true;} catch (Throwable ex) { result = null; ran = false; // 设置异常状态和异常结果 setException(ex);}if (ran) // 正常完成, 设置完成状态和结果 set(result);  } } finally {  // runner must be non-null until state is settled to  // prevent concurrent calls to run()  runner = null;  // state must be re-read after nulling runner to prevent  // leaked interrupts  int s = state;  if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s); }}protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) {  outcome = v;  STATE.setRelease(this, NORMAL); // final state  finishCompletion(); }}private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) {  if (WAITERS.weakCompareAndSet(this, q, null)) {// cas 移除 waiters, 对链表中的每个 Node 的线程 un
源文地址:https://www.guoxiongfei.cn/cntech/9869.html