怎么通过线程名找到指定的线程池

您所在的位置:网站首页 获取线程池 怎么通过线程名找到指定的线程池

怎么通过线程名找到指定的线程池

2024-07-11 23:43| 来源: 网络整理| 查看: 265

一、引言

一直以来都想写一个关于线程池源码的分析,但是这里面涉及的知识点很多没办法一次写完,因此一拖再多,现在实在不想再拖延下去了,总结一下线程池一些问题。如有不当的地方请各位同行及时指出,谢谢各位。

二、线程池的参数介绍

java.uitl.concurrent.ThreadPoolExecutor类就是我们构造线程池的核心类,首先看一下这个类的构造。

1、ThreadPoolExecutor的UML图 4671c44f66807059391f550c3b2317d4.png

从UML图中我们可以看出ThreadPoolExecutor继承自抽象类AbstractExecutorService实现ExecutorService接口继承自Executor接口 。

public class ThreadPoolExecutor extends AbstractExecutorService { public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } //最终调用的构造方法 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize = SHUTDOWN,也就是此时已经执行了shutdown操作,可能的状态为有SHUTDOWN、STOP、TIDYING、TERMINATED * 第二个条件:!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()) * 等价于(!(rs == SHUTDOWN) || !(firstTask == null) || workQueue.isEmpty())只要有个一个为true就成立 * 1.!(rs == SHUTDOWN) 为true表示rs!=SHUTDOWN的状态,也就是线程时处于SHUTDOWN、STOP、TIDYING、TERMINATED,不需要创建worker线程了 * 2.(firstTask != null) 为true表示提交任务过程中,线程池刚好处于SHUTDOWN状态,不需要创建worker线程了 * 3.workQueue.isEmpty() 为true表示任务队列已经空了,不需要创建worker线程了 */ //(6)检查队列是否是SHUTDOWN以上的状态 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; //(7)循环CAS增加线程个数 for (; ; ) { int wc = workerCountOf(c);//获取当前的工作线程个数 //(7.1)如果线程个数超过限制则返回false,即添加工作线程失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //(7.2)CAS操作workCount+1,如果+1成功则跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get();//重新读取ctl的值 //(7.3)CAS操作失败了,看线程池状态是否发生变化了,变化了则跳到外层循环重新获取线程池状态,否则内存循环重新获取CAS if (runStateOf(c) != rs) continue retry; } } //(8)能到这里说明CAS成功了 boolean workerStarted = false;//worker线程是否启动成功标志 boolean workerAdded = false;//是否已经将worker线程添加到workers这个HashSet集合中 Worker w = null; try { //(8.1)创建worker线程 w = new Worker(firstTask);//定义一个worker线程,这个woker线程绑定的是提交的task任务 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //(8.2)加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute()方法 mainLock.lock(); try { //(8.3)重新检查线程池状态,为了避免在ThreadFactory失败或者在获取锁之前调用了shutdown方法 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive())//预先检查t是否可启动 throw new IllegalThreadStateException(); //(8.4)将worker线程添加到集合中 workers.add(w); int s = workers.size(); if (s > largestPoolSize)//largestPoolSize记录最大活跃线程数 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //(8.5)添加成功则启动任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //(8.6)如果启动失败了,则回退操作 if (!workerStarted) addWorkerFailed(w); } return workerStarted; } 4、线程池中worker的执行

当我们完成了提交任务的过程的时候,下面一步就是worker线程是如何执行任务的。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); //在线程执行runWorker之前禁止被中断 this.firstTask = firstTask;//外部提交的任务 this.thread = getThreadFactory().newThread(this);//真实的执行任务的线程 }}

从Worker类中我们可以看出它是实现了Runnable接口,并且是AQS的子类,那么我们可以推测出它能够进行并发的控制(lock、unlock)。runWorker的实现如下:

final void runWorker(Worker w) { Thread wt = Thread.currentThread();//在woker的流程中执行worker.start()之后真实调用的方法 Runnable task = w.firstTask;//获取当前worker携带的task任务 w.firstTask = null; /** * 这里直接调用了unlock方法,但是我们并没有看到调用lock方法,说明unlock之前不一定需要lock */ //(9)state设置成0,将占用锁的线程设置为null(第一次执行之前没有线程占用) w.unlock(); boolean completedAbruptly = true; try { //(10.0)自旋。先执行自己携带的任务,然后从阻塞队列中获取一个任务直到无法获取任务 while (task != null || (task = getTask()) != null) { //(10.1)将satte设置为1,设置占有锁的线程为自己 w.lock(); //(10.2)检查线程池状态,如果状态为STOP以上(STOP以上不执行任务),并且当前线程还未被中断则中断当前线程。 //第二次检查状态是为了处理shutdownNow操作 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //(10.3)这是⼀个钩⼦⽅法,留给需要的⼦类实现 beforeExecute(wt, task); Throwable thrown = null; try { //(10.4)执⾏任务⽅法,若任务执⾏发⽣异常,当前worker不会再继续执⾏任务,线程销毁,后续会新增⼀个线程进⾏补偿 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //(10.5)钩⼦⽅法,执⾏任务处理后逻辑,如异常处理 afterExecute(task, thrown); } } finally { task = null; //(10.6)任务执⾏异常,也认为执行完毕,进行任务数量统计 w.completedTasks++; w.unlock(); } } /** * 执行到这里代表while循环结束了。worker线程正常结束了 * 1.workQueue中没有任务了(poll超时,或者使用take时通过shutdown、shutdownNow中断了) * 2.worker线程执行时没有task出现异常,否则也会跳出循环 */ //(10.7)执行到这里代表非核心线程在keepAliveTime内无法获取任务而退出 completedAbruptly = false; } finally { //(11)从上面可以看出如果实际业务(task任务)出现异常会导致当前worker终止 //completedAbruptly此时为true代表worker突然完成,不是正常退出 processWorkerExit(w, completedAbruptly); } }

其中processWorkerExit方法的源码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) //如果是task异常导致结束,workerCount数需要减1 decrementWorkerCount(); //(11.1)统计整个线程池完成的任务个数,并从works集合中删除当前worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //(11.2)尝试设置线程状态为TERMINATED //条件一:如果当前是SHUTDWON状态并且工作队列为空 //或者 //条件二:当前状态是STOP状态并且当前线程池里面没有活动线程 tryTerminate(); //(11.3)如果当前线程个数小于核心线程数,则新增worker线程 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } 5、线程池获取任务

worker线程会通过自旋方式一直循环获取task任务,先执行自己携带的任务,如果自己携带的任务为空则从阻塞队列中获取任务。

//(10)自旋。先执行自己携带的任务,然后从阻塞队列中获取一个任务直到无法获取任务 while (task != null || (task = getTask()) != null){......}

其中,从阻塞队列中获取任务的getTask方法源码如下:

private Runnable getTask() { boolean timedOut = false;//最近一次poll()超时了吗? //(10.0.1)自旋获取任务(因为是多线程环境) for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); /** * 1.线程池状态是SHUTDOWN状态并且任务队列是空 * 2.线程池是STOP以上的状态 * (10.0.2)满足以上两个条件则workerCount数-1,并且返回null从而保证获取任务的worker进行正常退出 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * 1.允许核心线程退出 * 2.当前线程数量超过corePoolSize核心线程数 * (10.0.3)这时获取任务的机制切换为poll(keepAliveTime) */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 1、线程数大于maximumPoolSize(什么时候会出现这种情况? 当maximumPoolSize初始设置为0或者其他线程通过set方法对其进行修改) * 2、线程数未超过maximumPoolSize但是timed为true(允许核心线程退出或者线程数量超过核心线程)并且上次获取任务超时(没获取到任务,我们推测本次依旧会超时) * 3、在满足条件1或者条件2的情况下进行check:运行线程数大于1或者任务队列没有任务 * (10.0.4)满足以上条件执行worker线程数减1操作,并且返回null从而保证获取任务的worker进行正常退出 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //(10.0.5)如果允许超时退出,则调用poll(keepAliveTime)获取任务,否则则通过tack()一直阻塞等待直到有任务提交到队列 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //(10.0.6)当等待超过keepAliveTime时间未获取到任务时,标记为true。在下次自旋时会进入销毁流程 timedOut = true; } catch (InterruptedException retry) { //(10.0.7)什么时候会抛出异常?当调用shutdown或者shutdownNow方法触发worker内的Thread调用interrupt方法时会执行到此处 timedOut = false; } } } 6、线程池关闭操作

线程池提供了两种关闭线程池的方法:

shutdown():调用后,不可以再 submit 新的 task,已经 submit 的将继续执行。shutdwonNow():调用后,试图停止当前正在执行的 task,并返回尚未执行的task的列表。6.1、 调用shutdown方法 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//利用排它锁进行上锁,保证只有一个线程执行关闭流程 try { //(12)权限检查 checkShutdownAccess(); //(13)内部通过自旋+CAS修改线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //(14)遍历所有的worker,进行中断通知 interruptIdleWorkers(); onShutdown();//关闭线程池时调用的钩子函数 } finally { mainLock.unlock(); } //(15)进行最后的整理工作,尝试状态变为TERMINATED tryTerminate(); } //如果当前状态>=SHUTDOWN则直接返回,否则设置当前状态为SHUTDOWN private void advanceRunState(int targetState) { for (; ; ) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } //设置所有空闲线程的中断标志 private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; //如果工作线程没有被中断,并且没有正在运行则设置中断 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } } //尝试终止线程池 final void tryTerminate() { for (; ; ) { int c = ctl.get(); /** * 1.处于RUNNING状态 * 2.处于TIDYING、TERMINATED状态(已经终止过) * 3.处于SHUTDOWN状态但是workQueue不为空,还有任务未处理 * (15.1)满足以上任何1种条件线程池不能被终止 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return;//当前线程池不能被为终止 /** * 1.处于SHUTDOWN状态并且workQueue为空,或者STOP状态 * 2.如果此时线程池还有线程(正在执行任务,正在等待任务),中断1个空闲的worker线程 * (15.2)如果还有worker线程,只中断1个线程并返回 */ if (workerCountOf(c) != 0) { //有资格终止 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //(15.3)当前已经没有运行态的线程了,将线程池状态设置为TIDYING,workerCount=0 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //(15.4)钩子方法,待子类实现 terminated(); } finally { //(15.5)将线程池状态设置为TERMINATED,workerCount=0 ctl.set(ctlOf(TERMINATED, 0)); //(15.5)将线程池状态设置为TERMINATED后唤醒awaitTermination操作阻塞的线程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } 代码(15)判断如果当前线程池状态是SHUTDOWN状态并且工作队列为空或者当前是STOP状态当前线程池里面没有活动线程则设置线程池状态为TERMINATED,如果设置为了TERMINATED状态还需要调用条件变量termination的signalAll()方法激活所有因为调用线程池的awaitTermination()方法而被阻塞的线程。6.2、 调用shutdownNow方法调用shutdownNow()后,线程池就不会再接受新的任务,并且丢弃工作队列里面里面的任务,正在执行的任务会被中断,该方法是立刻返回的,并不等待激活的任务执行完成再返回。返回值为这时候队列里面被丢弃的任务列表。代码如下: public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //(16)权限检查 checkShutdownAccess(); //(17)设置线程池状态为STOP advanceRunState(STOP); //(18)中断所有线程 interruptWorkers(); //(19)移动任务队列到tasks中 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } //中断所有的worker线程,包含空闲线程和正在执行任务的线程 private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } 7、awaitTermination操作线程池调用awaitTermination(long timeout, TimeUnit unit)方法后,当前线程会被阻塞,直到线程池状态变为了TERMINATED才返回,或者等待时间超时才返回,整个过程独占锁,代码如下: public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (; ; ) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // We don't really know how many new threads are "needed". // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so. int k = Math.min(delta, workQueue.size()); while (k-- > 0 && addWorker(null, true)) { if (workQueue.isEmpty()) break; } } }setMaximumPoolSize(int maximumPoolSize):设置线程池最大容量。 public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize maximumPoolSize) interruptIdleWorkers(); } 当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。 五、线程池状态转换

首先,总结一下线程池的状态(这里的状态值取移位之前的)。

4668376f689f4b767a7abd16e068af25.png

线程池状态

b913e48ceb38a082019d26b81a6d1134.png 六、总结

如何选择线程池数量? 影响线程池大小的因素: CPU的数量、内存大小、 任务计算密集型还是IO密集型等。 牛人总结的线程池计算公式如下:

NCPU = CPU的数量UCPU = 期望对CPU的使用率 0 ≤ UCPU ≤ 1W/C = 等待时间与计算时间的比率如果希望处理器达到理想的使用率,那么线程池的最优大小为: 线程池大小=NCPU *UCPU(1+W/C) 一般需要根据任务的类型来配置线程池大小:如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1如果是IO密集型任务,参考值可以设置为2*NCPU具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3