Java并发包中线程池ThreadPoolExecutor原理探究

一、线程池简介线程池的使用主要是解决两个问题:①当执行大量异步任务的时候线程池能够提供更好的性能,在不使用线程池时候,每当需要执行异步任务的时候直接new一个线程来运行的话,线程的创建和销毁都是需要开销的。而线程池中的线程是可复用的,不需要每次执行异步任务的时候重新创建和销毁线程;②线程池提供一种资源限制和管理的手段,比如可以限制线程的个数,动态的新增线程等等。在下面的分析中,我们可以看到,线程池...

Java并发包中线程池ThreadPoolExecutor原理探究

一、线程池简介

  线程池的使用主要是解决两个问题:①当执行大量异步任务的时候线程池能够提供更好的性能,在不使用线程池时候,每当需要执行异步任务的时候直接new一个线程来运行的话,线程的创建和销毁都是需要开销的。而线程池中的线程是可复用的,不需要每次执行异步任务的时候重新创建和销毁线程;②线程池提供一种资源限制和管理的手段,比如可以限制线程的个数,动态的新增线程等等。

  在下面的分析中,我们可以看到,线程池使用一个Integer的原子类型变量来记录线程池状态和线程池中的线程数量,通过线程池状态来控制任务的执行,每个工作线程Worker线程可以处理多个任务。

二、ThreadPoolExecutor类

1、我们先简单看一下关于ThreadPoolExecutor的一些成员变量以及其所表示的含义

  ThreadPoolExecutor继承了AbstractExecutorService,其中的成员变量ctl是一个Integer类型的原子变量,用来记录线程池的状态和线程池中的线程的个数,类似于前面讲到的读写锁中使用一个变量保存两种信息。这里(Integer看做32位)ctl高三位表示线程池的状态,后面的29位表示线程池中的线程个数。如下所示是ThreadPoolExecutor源码中的成员变量

 1 //(高3位)表示线程池状态,(低29位)表示线程池中线程的个数; 2 // 默认状态是RUNNING,线程池中线程个数为0 3 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 4  5 //表示具体平台下Integer的二进制位数-3后的剩余位数表示的数才是线程的个数; 6 //其中Integer.SIZE=32,-3之后的低29位表示的就是线程的个数了 7 private static final int COUNT_BITS = Integer.SIZE - 3; 8  9 //线程最大个数(低29位)00011111111111111111111111111111(1<<29-1)10 private static final int CAPACITY= (1 << COUNT_BITS) - 1;11 12 //线程池状态(高3位表示线程池状态)13 //111 0000000000000000000000000000014 private static final int RUNNING = -1 << COUNT_BITS;15 16 //000 0000000000000000000000000000017 private static final int SHUTDOWN=  0 << COUNT_BITS;18 19 //001 0000000000000000000000000000020 private static final int STOP =  1 << COUNT_BITS;21 22 //010 0000000000000000000000000000023 private static final int TIDYING =  2 << COUNT_BITS;24 25 //011 0000000000000000000000000000026 private static final int TERMINATED =  3 << COUNT_BITS;27 28 //获取高3位(运行状态)==> c & 1110000000000000000000000000000029 private static int runStateOf(int c)  { return c & ~CAPACITY; }30 31 //获取低29位(线程个数)==> c &  0001111111111111111111111111111132 private static int workerCountOf(int c)  { return c & CAPACITY; }33 34 //计算原子变量ctl新值(运行状态和线程个数)35 private static int ctlOf(int rs, int wc) { return rs | wc; }

  下面我们简单解释一下上面的线程状态的含义:

  ①RUNNING:接受新任务并处理阻塞队列中的任务

  ②SHUTDOWN:拒绝新任务但是处理阻塞队列中的任务

  ③STOP:拒绝新任务并抛弃阻塞队列中的任务,同时会中断当前正在执行的任务

  ④TIDYING:所有任务执行完之后(包含阻塞队列中的任务)当前线程池中活跃的线程数量为0,将要调用terminated方法

  ⑥TERMINATED:终止状态。terminated方法调用之后的状态

2、下面初步了解一下ThreadPoolExecutor的参数以及实现原理

  ①corePoolSize:线程池核心现车个数

  ②workQueue:用于保存等待任务执行的任务的阻塞队列(比如基于数组的有界阻塞队列ArrayBlockingQueue、基于链表的无界阻塞队列LinkedBlockingQueue等等)

  ③maximumPoolSize:线程池最大线程数量

  ④ThreadFactory:创建线程的工厂

  ⑤RejectedExecutionHandler:拒绝策略,表示当队列已满并且线程数量达到线程池最大线程数量的时候对新提交的任务所采取的策略,主要有四种策略:AbortPolicy(抛出异常)、CallerRunsPolicy(只用调用者所在线程来运行该任务)、DiscardOldestPolicy(丢掉阻塞队列中最近的一个任务来处理当前提交的任务)、DiscardPolicy(不做处理,直接丢弃掉)

  ⑥keepAliveTime:存活时间,如果当前线程池中的数量比核心线程数量多,并且当前线程是闲置状态,该变量就是这些线程的最大生存时间

  ⑦TimeUnit:存活时间的时间单位。

  根据上面的参数介绍,简单了解一下线程池的实现原理,以提交一个新任务为开始点,分析线程池的主要处理流程

3、关于一些线程池的使用类型

  ①newFixedThreadPool:创建一个核心线程个数和最大线程个数均为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keepAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲即回收。

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}

  ②newSingleThreadExecutor:创建一个核心线程个数和最大线程个数都为1 的线程池,并且阻塞队列长度为Integer.MAX_VALUE,keepAliveTime=0说明只要线程个数比核心线程个数多并且当前线程空闲即回收该线程。

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService  (new ThreadPoolExecutor(1, 1,  0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue<Runnable>()));}

  ③newCachedThreadPoolExecutor:创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列(最多只有一个元素),keepAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的线程池的特点就是:加入同步队列的任务会被马上执行,同步队列中最多只有一个任务

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}

  4、ThreadPoolExecutor中的其他成员

  其中的ReentrantLock可参考前面写到的Java中的锁——Lock和synchronized,其中降到了ReentrantLock的具体实现原理;

  关于AQS部分可参考前面说到的Java中的队列同步器AQS,也讲到了关于AQS的具体实现原理分析;

  关于条件队列的相关知识可参考前面写的Java中的线程协作之Condition,里面说到了关于Java中线程协作Condition的实现原理;

//独占锁,用来控制新增工作线程Worker操作的原子性private final ReentrantLock mainLock = new ReentrantLock();//工作线程集合,Worker继承了AQS接口和Runnable接口,是具体处理任务的线程对象//Worker实现AQS,并自己实现了简单不可重入独占锁,其中state=0表示当前锁未被获取状态,state=1表示锁被获取,//state=-1表示Work创建时候的默认状态,创建时候设置state=-1是为了防止runWorker方法运行前被中断private final HashSet<Worker> workers = new HashSet<Worker>();//termination是该锁对应的条件队列,在线程调用awaitTermination时候用来存放阻塞的线程private final Condition termination = mainLock.newCondition();

三、execute(Runnable command)方法实现

  executor方法的作用是提交任务command到线程池执行,可以简单的按照下面的图进行理解,ThreadPoolExecutor的实现类似于一个生产者消费者模型,当用户添加任务到线程池中相当于生产者生产元素,workers工作线程则直接执行任务或者从任务队列中获取任务,相当于消费之消费元素。

 1 public void execute(
源文地址:https://www.guoxiongfei.cn/cntech/19309.html