神刀安全网

线程池ThreadPoolExecutor源码解析

本文主要内容

  • 什么是线程池
  • 线程池的使用
  • 线程池的原理
  • 线程池中的位运算
  • 源码解析

什么是线程池

如果频繁创建线程,也是会影响整体资源或效率的,线程池的产生是为了避免重复的创建线程和回收线程。线程池有以下几个作用:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果入限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。

线程池的使用

先来看看线程池的核心构造方法:

public ThreadPoolExecutor(int corePoolSize,                           int maximumPoolSize,                           long keepAliveTime,                           TimeUnit unit,                           BlockingQueue<Runnable> workQueue,                           ThreadFactory threadFactory,                           RejectedExecutionHandler handler) {     //异常处理略过     this.corePoolSize = corePoolSize;     this.maximumPoolSize = maximumPoolSize;     this.workQueue = workQueue;     this.keepAliveTime = unit.toNanos(keepAliveTime);     this.threadFactory = threadFactory;     this.handler = handler; } 

需要用户指定以下几个关键变量:

  • corePoolSize:核心线程数量
  • maximumPoolSize:最大线程数量
  • allowCoreThreadTimeOut:是否允许线程超时(设置为true时与keepAliveTime,TimeUnit一起起作用)
  • keepAliveTime:线程存活时间(当线程池允许线程超时且运行中的线程数量超过- corePoolSize时,会按照此变量设置时间关闭线程)
  • TimeUnit:单位(一般与keepAliveTime同时使用,供线程池判断是否满足关闭线程的条件)
  • workQueue:缓冲队列
  • RejectedExecutionHandler:拒绝处理任务类(默认:AbortPolicy 会抛异常,见下方实例)
  • threadFactory:线程工厂(默认:DefaultThreadFactory)

明白这些核心数据的作用,就可以随意使用线程池了

线程池的原理

如果让我们自己来设计一个线程池,我们应该怎么做呢?线程池最大的功能在于线程重复使用,不需要每执行一个任务就重新构造新线程。

如果用户提交的任务比较多,显示我们需要一个缓存队列用于存储任务。

为了达到线程重复使用的目的,线程应该不停地从缓存队列中获取新的任务,并且执行它。

线程池ThreadPoolExecutor源码解析

线程池中的位运算

线程池中使用一个AtomicInteger对象来表征线程池的状态和大小,为了达到1个int型数据能保存2个数据的目的,源码采用了非常精妙的位运算来实现。

//线程池的各个状态 private static final int RUNNING    = -1 << COUNT_BITS; private static final int SHUTDOWN   =  0 << COUNT_BITS; private static final int STOP       =  1 << COUNT_BITS; private static final int TIDYING    =  2 << COUNT_BITS; private static final int TERMINATED =  3 << COUNT_BITS; //两个常量 private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //获取状态、线程池大小的方法以及一个或方法 private static int runStateOf(int c)     { return c & ~CAPACITY; } private static int workerCountOf(int c)  { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } //关键变量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 

从源码可知,其实RUNNING等5个状态的值的前4位分别是:

  • 1110 、0000、 0010 、0100、 0110

各状态后面接了28个0。

CAPACITY的值,前4位为0,后28位为1。

如果要获取线程池状态,则调用runStateOf此方法,ctl的值将与 ~CAPACITY 作与操作,得到的正是ctl值的前4位。

获取当前线程池大小,则是与CAPACITY 作与操作,正是取ctl的低位值。

所以 ctl 的高位值用于表征线程池状态,而低们值用于表征线程池的大小。

ctl的初始化,调用ctlOf方法,正是拿RUNNING与0作或操作,结果还是RUNNING,所以可知,在线程池初始化的时候,默认ctl的状态值为RUNNING,而线程池大小为0。

ctlOf方法的含义也可知,正是组合状态值与线程池大小。

源码解析

一般来说,线程池添加任务有两种方式,一种是execute方法,另一种是submit方法,submit方法其实也是会调用execute,所以在此只研究execute方法即可。

public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();     //获取线程池状态     int c = ctl.get();     //如果线程池大小少于核心线程,则直接添加新的Worker并返回     if (workerCountOf(c) < corePoolSize) {         if (addWorker(command, true))             return;         c = ctl.get();     }     //如果核心线程数量已满,则考虑将任务添加进入缓存队列,然后再次检查状态     if (isRunning(c) && workQueue.offer(command)) {         int recheck = ctl.get();         if (! isRunning(recheck) && remove(command))             reject(command);         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     //最后核心线程数量已满,缓存队列已满,那么直接交给拒绝策略处理     else if (!addWorker(command, false))         reject(command); } 

再来看非常关键的addWorker方法:

private boolean addWorker(Runnable firstTask, boolean core) {     retry:     for (;;) {         int c = ctl.get();         //获取线程池状态         int rs = runStateOf(c);         // 对状态进行检测,如果线程池已经被调用shutDown方法时,返回false         if (rs >= SHUTDOWN &&             ! (rs == SHUTDOWN &&                firstTask == null &&                ! workQueue.isEmpty()))             return false;          for (;;) {             //获取线程池的大小             int wc = workerCountOf(c);             //如果线程池大小已经超过核心线程池大小,那么也直接返回,这种情况应该把任务缓存到队列中去             if (wc >= CAPACITY ||                 wc >= (core ? corePoolSize : maximumPoolSize))                 return false;             //正常情况下,线程池大小加1即可             if (compareAndIncrementWorkerCount(c))                 break retry;             c = ctl.get();  // Re-read ctl             if (runStateOf(c) != rs)                 continue retry;             // else CAS failed due to workerCount change; retry inner loop         }     }     //新建一个Worker,在执行Worker构造方法时,创建了新的线程,即下面的t,具体可参见Worker的构造方法。     Worker w = new Worker(firstTask);     Thread t = w.thread;      final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         // Recheck while holding lock.         // Back out on ThreadFactory failure or if         // shut down before lock acquired.         int c = ctl.get();         int rs = runStateOf(c);          if (t == null ||             (rs >= SHUTDOWN &&              ! (rs == SHUTDOWN &&                 firstTask == null))) {             decrementWorkerCount();             tryTerminate();             return false;         }              workers.add(w);          int s = workers.size();         if (s > largestPoolSize)             largestPoolSize = s;     } finally {         mainLock.unlock();     }     //启动线程     t.start();     // It is possible (but unlikely) for a thread to have been     // added to workers, but not yet started, during transition to     // STOP, which could result in a rare missed interrupt,     // because Thread.interrupt is not guaranteed to have any effect     // on a non-yet-started Thread (see Thread#interrupt).     if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())         t.interrupt();      return true; } 

在addWorker方法中,启动了一个新的线程,那我们来看看线程中的run方法,其实即是runWorker方法:

final void runWorker(Worker w) {     Runnable task = w.firstTask;     w.firstTask = null;     boolean completedAbruptly = true;     try {         //请注意,这是一个while循环,如果当前Worker的task不为空,则取当前Worker的task,执行任务         //如果当前task为空,则从缓存队列中取任务来执行,这就是线程池中最重要概念线程复用的体现         while (task != null || (task = getTask()) != null) {             w.lock();             clearInterruptsForTaskRun();             try {                 beforeExecute(w.thread, task);                 Throwable thrown = null;                 try {                     //执行任务                     task.run();                 } catch (RuntimeException x) {                     thrown = x; throw x;                 } catch (Error x) {                     thrown = x; throw x;                 } catch (Throwable x) {                     thrown = x; throw new Error(x);                 } finally {                     afterExecute(task, thrown);                 }             } finally {                 task = null;                 w.completedTasks++;                 w.unlock();             }         }         completedAbruptly = false;     } finally {         processWorkerExit(w, completedAbruptly);     } } 

我们来看看getTask方法是否真的是从缓存队列中取任务:

private Runnable getTask() {     boolean timedOut = false; // Did the last poll() time out?      retry:     for (;;) {         int c = ctl.get();         int rs = runStateOf(c);          // Check if queue empty only if necessary.         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {             decrementWorkerCount();             return null;         }          boolean timed;      // Are workers subject to culling?          for (;;) {             int wc = workerCountOf(c);             timed = allowCoreThreadTimeOut || wc > corePoolSize;             //正常情况下,线程池数量应该少于最大值,并且也不会timedOut,             //如果真的大于了最大值,则应该删除一个线程             if (wc <= maximumPoolSize && ! (timedOut && timed))                 break;             if (compareAndDecrementWorkerCount(c))                 return null;             c = ctl.get();  // Re-read ctl             if (runStateOf(c) != rs)                 continue retry;             // else CAS failed due to workerCount change; retry inner loop         }          try {             //从缓存队列中取出任务             Runnable r = timed ?                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                 workQueue.take();             if (r != null)                 return r;             timedOut = true;         } catch (InterruptedException retry) {             timedOut = false;         }     } } 

线程池分析到这,基本结束了,内中最多的细节,比如同步锁的使用,比如状态的判断,比如最后停止线程池等等,不再细说了,线程池介绍到这基本原理都清楚了,有点类似于android中的Looper,死循环中取消息,分发消息并执行对应操作等,线程池也是这样。

线程池中的位运算,刚开始可能看不懂,但在纸上写下各个数,基本就能明白各个方法的含义了。

最后再补充一个小细节,负数在计算机中的表示,为了更方便实现2进制数的加减法,负数使用补码表示,具体则是正数部分取反加1,而正数的补码则是正数本身。正因为如此,RUNNING 前4位才是1110,具体的各位可以计算下,补码的意义可以自行搜索。

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » 线程池ThreadPoolExecutor源码解析

分享到:更多 ()