【并发】Java并发线程池底层原理详解与源码分析(下)

您所在的位置:网站首页 android线程池详解 【并发】Java并发线程池底层原理详解与源码分析(下)

【并发】Java并发线程池底层原理详解与源码分析(下)

2023-04-09 15:07| 来源: 网络整理| 查看: 265

【并发】Java并发线程池底层原理详解与源码分析(下)

前情回顾

上篇文章地址

遗留问题解析

手动实现线程池代码

运行结果

!!!先说结论!!!

图解 ThreadPoolExecutor  

ThreadPoolExecutor 源码分析

线程池源码结构

【面试题】 execute()方法与submit()方法有什么区别???

ThreadPoolExecutor 变量详解 

ThreadPoolExecutor 之 execute()方法详解 

(1)添加核心线程执行任务

(2)任务入队(队列) 

(3)添加临时线程执行任务,如果失败则执行拒绝策略  

【问】这里为什么需要二次确认???

ThreadPoolExecutor 之 addWorker()方法详解  

(1)goto部分 做“check”

(2)“干事情”

ThreadPoolExecutor 之 runWorker()方法详解 

执行优先级源码实现

线程复用机制源码实现 

【并发】Java并发线程池底层原理详解与源码分析(下) 前情回顾 上篇文章地址

【并发】Java并发线程池底层原理详解与源码分析(上)_面向鸿蒙编程的博客-CSDN博客线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。Executors 返回的线程池对象的弊端:1) FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。2.CachedThreadPool:允许的创建线程数量为 MAX_VALUE,可能会创建大量建大量的线程,从而导致 OOMhttps://blog.csdn.net/weixin_43715214/article/details/128045130

遗留问题解析 手动实现线程池代码 public class ThreadPoolDemo { public static void main(String[] args) { // 自定义线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10)); for (int i = 1; i 再装满队列 -> 最后是临时线程

ThreadPoolExecutor 运行完整的流程

ThreadPoolExecutor 源码分析

接下来,我将会结合线程池ThreadPoolExecutor的源码来解析这个问题!!! 

线程池源码结构

 顶层接口是Executor,它只有一个方法execute();ExecutorService接口继承了Executor接口,它主要是提供了submit()。

【面试题】 execute()方法与submit()方法有什么区别???

submit的底层还是调用了execute

(1)关于返回值的问题

submit:有返回值,返回值(包括异常)被封装于FutureTask对象。适用于有返回结果的任务。execute:void类型的函数,没有返回值,适用于没有返回的任务。

(2)关于异常处理的问题

submit:submit的时候并不会抛出异常(此时线程可能处于就绪状态)。只有在get操作的时候会抛出。因为get操作会阻塞等待线程的执行完毕。execute:在执行的时候会直接抛出。可以通过实现 UncaughtExceptionHandler接口来完成异常的捕获。 ThreadPoolExecutor 变量详解  // 记录线程池的状态信息,高三位是状态信息,其余位表示工作的worker数量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 表示worker数量 private static final int COUNT_BITS = Integer.SIZE - 3; // worker容量 private static final int CAPACITY = (1 = shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty()) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 判断容量 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试通过CAS方式,如果前面没有问题,这里自增+1 if (compareAndIncrementWorkerCount(c)) // 自增后退出循环,执行后面逻辑! break retry; c = ctl.get(); // 如果线程池状态发生变化,重新从最外层循环 if (runStateOf(c) != rs) continue retry; } }

判断是否要开临时线程

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;

这个代码有一点绕!翻译过来其实就是:

rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty()) 

rs > shutdown:线程池状态处于STOP,TIDYING,TERMINATED时,添加工作线程失败,不接受新任务rs >= shutdown && firstTask != null:线程池状态处于 SHUTDOWN,STOP,TIDYING,TERMINATED状态且worker的首个任务不为空时,添加工作线程失败,不接受新任务。rs >= shutdown && workQueue.isEmppty:线程池状态处于 SHUTDOWN,STOP,TIDYING,TERMINATED状态且阻塞队列为空时,添加工作线程失败,不接受新任务。

当满足这3种情况的时候,就不要 “加 Worker” 了!!!

判断容量

int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; 工作线程数量是否超过可表示的最大容量(CAPACITY)如果添加核心线程,是否超过最大核心线程容量(corePoolSize)如果添加临时线程,是否超过线程池最大线程容量(maximumPoolSize) 

如果容量符合“标准”,就会执行后面的CAS操作;如果CAS执行成功,退出循环,进入第二阶段!

private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } (2)“干事情” // 注意 Worker代表线程,Task表示任务 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建Worker对象实例 w = new Worker(firstTask); //获取Worker对象里的线程 final Thread t = w.thread; if (t != null) { //开启可重入锁,独占 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取线程池运行状态 int rs = runStateOf(ctl.get()); //满足 rs < SHUTDOWN 判断线程池是否是RUNNING,或者 //rs == SHUTDOWN && firstTask == null 线程池如果是SHUTDOWN, //且首个任务firstTask为空, if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //将Worker实例加入线程池workers workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //线程添加成功标志位 -> true workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //如果worker实例加入线程池成功,则启动线程,同时修改线程启动成功标志位 -> true if (workerAdded) { // 执行 start 会调 run方法!!! t.start(); workerStarted = true; } } } finally { if (! workerStarted) //添加线程失败 回滚 addWorkerFailed(w); } return workerStarted;

最核心的部分如下所示 

try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 当前线程还没有启动,但是它却是存活状态,就会抛异常!!! if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } 首先检查线程池的状态,当线程处于 RUNNING 状态 或者 线程处于 SHUTDOWN 状态且当前线程的 firstTask 为空,满足以上条件时才能将 worker 实例添加进线程池,即workers.add(w);同时修改 largestPoolSize,largestPoolSize变量用于记录目前最大线程数。将标志位 workerAdded 设置为 true,表示添加线程成功。无论成功与否,在 finally 中都必须执行 mainLock.unlock()来释放锁。 if (workerAdded) { t.start(); workerStarted = true; }

当上面的条件都满足的时候,会执行 start() 方法,会调 run方法!!! 

ThreadPoolExecutor 之 runWorker()方法详解 

在线程池中,其实还有两个概念——提交优先级 和 执行优先级 

上面的源码分析,其中 exectue() 和 addWorker() 是提交部分;而这里的 runWorker() 是执行部分

在上面的addWorker()方法中调了start()方法,JVM会通过 start()来调用run()方法!!!自然最后会调到runWorker()

if (workerAdded) { // 执行 start 会调 run方法!!! t.start(); workerStarted = true; } public void run() { runWorker(this); } final void runWorker(ThreadPoolExecutor.Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 执行优先级!!! while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } 执行优先级源码实现 while (task != null || (task = getTask()) != null) {...}

为什么说这里可以将执行优先级体现得“淋漓尽致”呢???

这个task表示任务,当 task 为null的时候,才会执行 getTask() 即从队列中获取任务,并交给线程处理!

这里又一次的说明了 addWorker(null,false) 的作用!!! 

所以,总的来说,这里的逻辑是核心线程和临时线程先处理完自己手头上的任务后,才会去线程池里拿!!! 

beforeExecute(wt, task); afterExecute(task, thrown);

上面的代码中可以看到有beforeExecute、afterExecute,它们都是钩子函数,可以分别在子类中重写它们用来扩展 ThreadPoolExecutor,例如添加日志、计时、监视或者统计信息收集的功能。

线程复用机制源码实现  processWorkerExit(w, completedAbruptly); private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } // 线程复用!!! addWorker(null, false); } }

在 runWorker() 代码的最后的finally里面,会调 processWorkerExit() 方法,这个方法非常的重要!因为线程池的复用机制就是在这里体现的!!!

最后这里调了 addWorker(null, false) 表示会创建一个新的线程(但是没有任务),其实就是表示当前线程将自己之前手头上的活处理完了,现在又可以接活了!!!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3