Java基础——线程池原理与使用

在日常开发中,大家会经常遇到一个任务不需要关心执行结果,只需要异步地执行即可,或者一个大任务需要拆解成多个小任务分别执行,执行完后再汇总返回的情景,这时候就需要线程池来实现这种功能了。下面分别从常见的线程池面试点、什么是线程池、线程池种类、线程池生命周期以及线程池使用及实现、线程池使用情景阐述,有误之处望多多海涵、互相交流。


1、常见面试点

  • 为什么建议自定义线程池参数
  • 如何实现一个线程池
  • 线程池有几种任务拒绝策略

2、什么是线程池

线程池是预先生成N个线程,有任务提交时把任务放进任务队列中,并交付给空闲的线程处理,若当前没有空闲线程则根据设定的策略来处理已提交的任务,处理完任务后不会销毁线程,进而达到复用、减少频繁创建、销毁线程的效果。

 1public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
 2            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,
 3                          RejectedExecutionHandler handler) {
 4if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
 5            throw new IllegalArgumentException();
 6if (workQueue == null || threadFactory == null || handler == null)
 7      throw new NullPointerException();
 8    this.corePoolSize = corePoolSize;
 9    this.maximumPoolSize = maximumPoolSize;
10    this.workQueue = workQueue;
11    this.keepAliveTime = unit.toNanos(keepAliveTime);
12    this.threadFactory = threadFactory;
13    this.handler = handler;
14}

3、线程池种类

  • FixedThreadPool:这是一个线程数固定的线程池,即corePoolSize与maximumPoolSize为固定值,keepAliveTime为0,使用无边限的LinkedBlockingQueue,当线程数被创建时即已经创建好固定的线程数,即使新增或减少任务,线程数也是固定不变的,适合于整体上需要的线程数变化不大的情景。
  • SingleThreadExecutor:这是单个线程数的线程池,即corePoolSize及maximumPoolSize数都设置为1,keepAliveTime为0,使用无边限的LinkedBlockingQueue,对提交的任务顺序执行,更具准确性。
  • CachedThreadPool:这是一个线程数可伸缩、任务队列无边限的线程池,即corePoolSize设置为0,maximumPoolSize为Integer最大值,keepAliveTime设置60秒,使用SynchronousQueue作为任务队列,当新增任务时会先检查是否有空闲线程,若没有则会新建线程处理任务当空闲线程超过keepAliveTime后则会进行销毁回收。
  • ScheduledThreadPoolExecutor:这是一个核心线程数固定、定时执行任务的线程池,即corePoolSize为固定值,maximumPoolSize为Integer的最大值keepAliveTime为0,使用延迟队列DelayedQueue进行调度任务的执行。
  • ForkJoinPool:这是一个拆分、聚合任务的线程池,使用分而治之算法拆分为多个子任务,异步或同步执行任务,适合计算密集型情景使用。

4、线程池生命周期

  • running: 接受新任务并处理队列的任务。
  • shutdown: 不再接受新任务,但会处理队列中的任务。
  • stop: 不再接受新任务,也不处理队列的任务同时中断正在执行中的任务。
  • terminated: terminated() 方法已执行完。
  • tidying: 全部任务都已终止且工作线程数为零,线程池即将向tidying状态过度,即将运行terminated()方法。

状态转换

  • running->shutdown:shutdown()方法被调用。
  • running或shutdown->stop:shutdownNow()方法被调用时。
  • shutdown->tidying:当线程数为零且任务队列为空时。
  • stop->tidying:当线程数为零的时。
  • tidying->terminated:terminated()方法执行完。

5、线程池使用与实现

 1 public static void main(String[] args) {
 2   Runnable task = new Runnable() {
 3     @Override
 4     public void run(){
 5       System.out.println("run a task");
 6     }
 7   };
 8   //定义一个有10个线程的线程池
 9   ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); 
10   fixedThreadPool.submit(task)
11   //定义一个只有一个线程的线程池
12   ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
13   singleThreadPool.submit(task)
14   //定义一个线程数可伸缩的线程池
15   ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
16   cachedThreadPool.submit(task)
17   //定义一个可拆分任务的线程池
18   ExecutorService workStealPool = Executors.newWorkStealingPool();
19  }

注意事项:不建议直接使用上述方式构造线程池,原因是容易出现内存溢出、无法创建新线程等问题,其中固定线程池及单线程池使用的是无边限任务队列,可能会出现任务一直增多导致队列占用更多的内存,最终引起频繁GC或者内存不足抛出异常等问题,另外无边限线程池则可能会一直无限制的创建线程,我们都知道线程是珍贵资源,在JVM默认参数-Xss的配置中,新建一个线程大概占用1M内存左右,一直创建线程会占用更多的内存导致内存溢出、程序卡顿等问题,故建议根据业务情景自定义线程池。

  1//ExecutorService核心方法
  2public Future<?> submit(Runnable task) {
  3        if (task == null) throw new NullPointerException();
  4        RunnableFuture<Void> ftask = newTaskFor(task, null);
  5        execute(ftask);
  6        return ftask;
  7}
  8
  9//ThreadPoolExecutor核心方法
 10public void execute(Runnable command) {
 11  if (command == null)
 12                throw new NullPointerException();
 13  int c = ctl.get(); //获取线程池的状态
 14  //1、判断当前空闲线程数是否小于核心线程数,若是则新建核心线程并把任务设为首个任务
 15  if (workerCountOf(c) < corePoolSize) {
 16    //若addWorker返回false直接结束,addWork方法将会原子性地检查runState、workCount
 17    if (addWorker(command, true))
 18      return;
 19    c = ctl.get();
 20  }
 21  //2、判断线程池是否处于运行状态及是否成功添加任务到队列中
 22  if (isRunning(c) && workQueue.offer(command)) {
 23    int recheck = ctl.get();
 24    /*
 25            2.1 重新检查状态,有可能最后一次检查时存在的线程已被销毁或进入此方法时线程池被关闭
 26            成功添加任务后双重检查线程池的状态,若线程池状态为STOP,则移除队列中的任务并执行拒绝策略
 27            */
 28    if (! isRunning(recheck) && remove(command))
 29      reject(command);
 30    //2.2 线程池处于运行状态且当前空闲线程数为零则创建非核心线程
 31    else if (workerCountOf(recheck) == 0)
 32      addWorker(null, false);
 33  }
 34  //3、如果添加任务失败则尝试创建非核心线程,若创建失败,有可能线程池已被关闭或队列已饱和,拒绝任务
 35  else if (!addWorker(command, false))
 36    reject(command);
 37}
 38
 39//添加线程
 40private boolean addWorker(Runnable firstTask, boolean core) {
 41  retry:
 42  //CAS方式添加线程
 43  for (;;) {
 44    int c = ctl.get();
 45    int rs = runStateOf(c);
 46    //线程池处于非运行状态 and !(线程池处于关闭状态 and 任务为空 and 任务队列非空)
 47    if (rs >= SHUTDOWN &&
 48        ! (rs == SHUTDOWN &&
 49           firstTask == null &&
 50           ! workQueue.isEmpty()))
 51      return false;
 52    //通过添加
 53    for (;;) {
 54      int wc = workerCountOf(c);
 55      //判断当前线程数是否大于最大线程数限制,若超过则返回false
 56      if (wc >= CAPACITY ||
 57          wc >= (core ? corePoolSize : maximumPoolSize))
 58        return false;
 59      //通过cas更新当前线程数+1
 60      if (compareAndIncrementWorkerCount(c))
 61        break retry;
 62      c = ctl.get();  // Re-read ctl
 63      //若cas操作失败则重新读取当前线程数,并判断当前线程池的状态是否被更新
 64      if (runStateOf(c) != rs)
 65        continue retry;
 66      // else CAS failed due to workerCount change; retry inner loop
 67    }
 68  }
 69
 70  boolean workerStarted = false;
 71  boolean workerAdded = false;
 72  Worker w = null;
 73  try {
 74    w = new Worker(firstTask);
 75    final Thread t = w.thread;
 76    if (t != null) {
 77      final ReentrantLock mainLock = this.mainLock;
 78      //获取锁并添加新的线程到workers集合中
 79      mainLock.lock();
 80      try {
 81        // Recheck while holding lock.
 82        // Back out on ThreadFactory failure or if
 83        // shut down before lock acquired.
 84        int rs = runStateOf(ctl.get());
 85        //由于ThreadFactory创建线程时失败或获到锁前线程池已被SHUTDOWN,故重新检查线程池状态
 86        if (rs < SHUTDOWN ||
 87            (rs == SHUTDOWN && firstTask == null)) {
 88          if (t.isAlive()) // precheck that t is startable(判断创建的线程是否存活)
 89            throw new IllegalThreadStateException();
 90          workers.add(w);
 91          int s = workers.size();
 92          if (s > largestPoolSize)
 93            largestPoolSize = s;
 94          //添加成功后更新添加标志位
 95          workerAdded = true;
 96        }
 97      } finally {
 98        mainLock.unlock();
 99      }
100      //线程创建成功后,启动线程并更新启动标志位
101      if (workerAdded) {
102        t.start();
103        workerStarted = true;
104      }
105    }
106  } finally {
107    //若线程启动或创建失败,执行失败逻辑
108    if (! workerStarted)
109      addWorkerFailed(w);
110  }
111  return workerStarted;
112}
113
114private void addWorkerFailed(Worker w) {
115  final ReentrantLock mainLock = this.mainLock;
116  mainLock.lock();
117  try {
118    if (w != null)
119      workers.remove(w);  //从集合中移除线程
120    decrementWorkerCount(); //原子性更新线程数
121    tryTerminate();  // 尝试终止线程池
122  } finally {
123    mainLock.unlock();
124  }
125}

6、线程池使用情景

具体线程池的参数配置是根据业务的特性来决定的,以下有不同的任务特性:

  • 并发高、任务执行时间短的业务,线程池的线程数可设置为CPU核心数+1,减少线程上下文频繁切换。
  • 并发不高、任务执行时间很长的业务,根据以下情景进行设置。
    • 若是IO密集型任务,主要消耗在IO操作上而不会占用过多的CPU资源,故可调大线程数增加CPU的利用率
    • 若是CPU密集型任务,则耗时在CPU计算里,需要避免线程上下文的频繁切换,可参考第一种情景配置线程数
  • 并发高、任务执行时间也长的业务,这已不是通过调整线程池数能解决的,需要考虑系统的架构设计、缓存使用、中间件性能调优等方面。

除了根据任务的特性之外,还需要根据每秒任务数tasksInSecond(100-500)、每个任务大概执行时间taskCost(0.5s)、系统允许最大执行时间maxCost等计算。

  • 核心线程数: 每秒任务数/每个线程每秒处理能力,即tasksInSecond/(1/taskCost)
  • 任务队列大小:每秒线程处理任务数系统允许最大执行时间,即(corePool/taskCost) * maxCost
  • 最大线程数:(最大任务数-队列容量)/每个线程每秒处理能力 ,即(maxTasksInSecond-queue)/(1/taskCost)
  • keepAliveTime:一般使用默认提供的空闲时间即可
  • 线程工厂:一般使用默认提供即可,若需要更改线程优先级、线程前缀名称可自定义
  • 任务策略的配置需根据业务情景来决定,比如任务是否可丢失,若不可丢失可增加持久性的手段进行保存