工作中优雅的创建多线程(附带源码分析)

您所在的位置:网站首页 多线程分析 工作中优雅的创建多线程(附带源码分析)

工作中优雅的创建多线程(附带源码分析)

2023-07-02 06:11| 来源: 网络整理| 查看: 265

抄作业的直接去第三 一、创建的几种方式(看看就好,项目中都用线程池,没人单独创建,这里复习一下) 1、继承Thread

public class ThreadDemo extends Thread{ @Override public void run() { System.out.println("我继承了Thread"); } public static void main(String[] args) { ThreadDemo eh = new ThreadDemo(); eh.start(); } }

2、实现Runnable接口

public class ImRunableDemo { public static void main(String[] args) { Thread th = new Thread(new Runnable() { @Override public void run() { System.out.println("我实现了Runnable接口"); } }); th.start(); //看起来优雅 其实没啥区别 Thread th1 = new Thread(()->{ System.out.println("我实现了Runnable接口"); }); th1.start(); } }

优点:逻辑和数据更好的分离出来,且能处理共享资源,并且避免单继承的局限性。 3、使用Callable和FutureTask

public class CFDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { //FutureTask 实现了RunnableFuture接口, RunnableFuture继承了Runnable 相当于一个中间桥梁 把Callable传入到Thread FutureTask fk = new FutureTask( new Callable() { @Override public Integer call() throws Exception { System.out.println("我FutureTask"); return 11; } }); //参数只接受Runnable Thread th = new Thread(fk); th.start(); System.out.println(fk.get()); } }

优点:可以返回一个异步值给主线程,而前两个线程直接没有交互 4、线程池

public class ExecutorsDemo { private static ExecutorService pool = Executors.newFixedThreadPool(3); public static void main(String[] args) throws ExecutionException, InterruptedException { pool.execute(()->{ System.out.println("我实现了Runnable接口"); }); Future submit1 = pool.submit(()->{ System.out.println("我实现了Runnable接口"); } ); System.out.println("submit1:"+submit1.get()); Future submit = pool.submit(new Callable() { @Override public Integer call() throws Exception { System.out.println("我FutureTask"); return 11; } }); System.out.println("submit:"+submit.get()); } }

execute只接收Runnable类型的参数,且无返回值 submit可以接收Runnable类型的参数和Callable类型参数,且可以又返回值。

线程池的四种快捷方式:

public class ExecutorsPlusDemo { //单线程线程池 /*特点 * 1、按照提交顺序执行 * 2、唯一线程存活时间是无限的 * 3、唯一线程执行时,新任务会进入内部的阻塞队列中,阻塞队列是无界的 * */ private static ExecutorService newSinglePool = Executors.newSingleThreadExecutor(); //固定数量的线程池 /*特点 * 1、线程数量未达到固定数量,会创建新的直到达到固定数量 * 2、达到固定数量候会保持不变,如果一个线程异常结束,也会补充一个线程 * 3、线程达到固定数量时,新任务会进入内部的阻塞队列中,阻塞队列是无界的 * */ private static ExecutorService newFixedPool = Executors.newFixedThreadPool(3); //可缓存线程池 /*特点 * 1、有新任务,如果线程池中没有空闲线程,会新建一个线程 * 2、没有线程数量限制,大小取决于操作系统或者说取决于jvm * 3、如果有空闲线程,则会回收线程 * * 使用场景:可以削峰,适用于突发耗时较短的场景 * */ private static ExecutorService newCachedPool = Executors.newCachedThreadPool(); //可调度线程池 /*特点 * 1、可以提供延迟和周期性的线程 * */ private static ScheduledExecutorService newScheduledPool = Executors.newScheduledThreadPool(3); public static void main(String[] args) { /*for (int i = 0; i{ sleepSecond(3000); //System.out.println("newSinglePool"+new Date()); }); } newSinglePool.shutdown(); for (int i = 0; i{ sleepSecond(3000); //System.out.println("newFixedPool"+new Date()); }); } newFixedPool.shutdown();*/ newScheduledPool.scheduleAtFixedRate( ()->{ System.out.println("newScheduledPool "+new Date()); },//线程实例 0,//首次执行延迟 1,//两次开始执行的最小间隔 TimeUnit.SECONDS//时间间隔单位 ); sleepSecond(5000); newScheduledPool.shutdown(); } public static void sleepSecond(int i){ try { Thread.sleep(i); } catch (InterruptedException e) { e.printStackTrace(); } } }

二、线程池的标准创建(上述的不安全,很多都是无边界的创建线程) 上述的本质上还是通过ThreadPoolExecutor来创建的。 点开源码可以看到如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) @param corePoolSize the number of threads to keep in the pool, even if they are idle, unless {@code allowCoreThreadTimeOut} is set @param maximumPoolSize the maximum number of threads to allow in the pool @param keepAliveTime when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating. @param unit the time unit for the {@code keepAliveTime} argument @param workQueue the queue to use for holding tasks before they are executed. This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method. @param handler the handler to use when execution is blocked because the thread bounds and queue capacities are reached

翻译成人话:

@param corePoolSize 核心线程,即使线程空闲也不会回收(工作线程没达到核心线程,会优先创建新的线程,而不是使用空闲线程) @param maximumPoolSize 线程上限 @param keepAliveTime 线程最大空闲时间 (如果超过这个时间,非核心线程会被回收,也可以设置的回收核心线程) @param unit 时间单位 @param workQueue 阻塞队列(核心线程在忙,就会放这个队列里面,如果满了,就会创建非核心线程,这个队列满了,再创建新线程,直到总数到达线程上限,新来的会执行拒绝策略) @param handler 拒绝策略

BlockingQueue 阻塞队列有以下几种:

//点进接口,点向下的箭头就能看到这几个实现类 ArrayBlockingQueue //必须设置大小,超出这个大小会创建新线程 直到超过最大线程数 LinkedBlockingDeque //吞吐大于上面的,可以设置大小,默认Interger最大值(无界队列),newSingleThreadExecutor newFixedThreadPool就是用的这个 PriorityBlockingQueue //具有优先级的队列 DelayQueue //无界阻塞延迟队列 每个元素都有过期时间,从这里获取的时候,过期的元素才会出队newScheduledThreadPool使用的这个 SynchronousQueue 同步队列 不存储元素 也就是来一个线程创建一个线程,如果没创建,就会阻塞后面的任务,直到创建成功 newCachedThreadPool

线程的钩子,挺好用

public class ThreadPoolExecutorGZDemo { public static void main(String[] args) throws InterruptedException { ExecutorService pool = new ThreadPoolExecutor(2,4,60,TimeUnit.SECONDS ,new LinkedBlockingQueue(2)){ @Override protected void terminated(){ System.out.println("调度器终止"); } @Override protected void beforeExecute(Thread t,Runnable target){ System.out.println("前钩"); super.beforeExecute(t,target); } @Override protected void afterExecute(Runnable target,Throwable t){ System.out.println("后钩"); super.afterExecute(target,t); } }; //执行 pool.execute(()->{ System.out.println("线程业务"); }); Thread.sleep(10000); pool.shutdown(); } }

拒绝策略 拒绝的两种情况1、达到最大数量 2、线程池关闭了 策略有以下几种:

AbortPolicy //新任务抛出异常 throw new RejectedExecutionException DiscardPolicy //新任务直接扔掉,是第一个的安静版本 DiscardOldestPolicy //扔掉最老的那个(队列中的元素) CallerRunsPolicy //自己执行 不使用线程池中的线程 自定义策略 继承RejectedExecutionHandler,可以自己试一下

三、优雅的创建线程池的因素 一般就三种情况: 1、io密集型任务(大部分时间在io操作,cpu空闲,因此可以多开几个,建议设置为cpu核数的两倍) 2、cpu密集型任务 (等于cpu就行) 3、混合型任务 最佳线程数 = (线程等待时间与线程cpu时间之比+1)* cpu核数 eg:cpu计算100ms db操作900 8核 则等于80

以下是贴出来的io的代码,其他的就是在线程数量上不同

public class ThreadUtilIO { //cpu核数 private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); //IO处理线程数 private static final int IO_MAX = Math.max(2,CPU_COUNT*2); //空闲时间 private static final int KEEP_ALIVE_SECONDS = 30; //有界队列size private static final int QUEUE_SIZE = 128; //懒汉式创建线程池:用于IO密集型任务 private static class IoIntenseTargetThreadPoolLazyHolder{ //线程池 private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor( IO_MAX, IO_MAX, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue(QUEUE_SIZE),//阻塞队列 先进先出 new CustomizableThreadFactory("io") ); static{ //当线程长时间空闲,可以自行销毁 EXECUTOR.allowCoreThreadTimeOut(true); //JVM关闭时的钩子函数 Runtime.getRuntime().addShutdownHook( new Thread(()->{ shutdownThreadPoolGracefully(EXECUTOR); }, "IO") ); } } //关闭线程池的方法 public static void shutdownThreadPoolGracefully(ExecutorService threadPool){ if(!(threadPool instanceof ExecutorService) || threadPool.isTerminated()){ return ; } try{ //不再接受新线程 threadPool.shutdown(); }catch (SecurityException e){ return; }catch (NullPointerException e){ return; } try { //等待60s,等待线程执行完 if(!threadPool.awaitTermination(60,TimeUnit.SECONDS)){ //取消正在执行的任务 threadPool.shutdownNow(); if(!threadPool.awaitTermination(60,TimeUnit.SECONDS)){ System.err.println("线程池任务未正常执行结束"); } } } catch (InterruptedException e) { threadPool.shutdownNow(); } try { if(!threadPool.isTerminated()){ for(int i=0; i break; } threadPool.shutdownNow(); } } } catch (InterruptedException e) { threadPool.shutdownNow(); }catch (Throwable e){ System.out.println(e.getMessage()); } } }

四、源码分析 阅读新感悟 1、新创建线程执行完本次的任务之后,会循环从队列中去取任务,直到队列中取不到runWorker() 2、根据1来看,执行完了就结束了,那么是怎么保证非核心销毁,核心线程保留? 逻辑在getTask()中

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) //线程池在运行状态 并且 加入队列成功 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//如果线程池不是运行状态 从队列移除任务成功 执行拒绝策略 reject(command); else if (workerCountOf(recheck) == 0) //如果在运行状态,并且目前线程数量为0 ,加一个null任务,让线程池创建线程(1、这个其实就是个安全检查,假如工作线程死,队列中的就永远执行不了了) addWorker(null, false); } else if (!addWorker(command, false))// 创建非核心 如果失败则拒绝 reject(command); } private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //整体就是检查一些状态 可不看 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && //如果线程池状态停了 才会执行这个,都是一些状态检查,可不看 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //工作线程已经大于等于核心或者最大线程了 则添加失败 前者本应该进队列或者拒绝 后者本应该直接拒接策略的 但是使用了添加 if (compareAndIncrementWorkerCount(c)) //这个以及下面的那个方法 又返回到方法 break retry; 就是跳出外循环的意思 执行下面的语句 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); //创建线程 这个方法核心方法就这一个 下面主要还是更新一些状态 接下来是执行runWorker方法 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { //task 不等于空 则直接执行run 执行完成之后 置空 ,再此循环就直接取队列中的任务 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //核心线程的默认值allowCoreThreadTimeOut为false,如果允许销毁核心线程或者当前工作线程大于核心,则为true if ((wc > maximumPoolSize || (timed && timedOut)) //工作线程大于最大线程数或者(需要销毁线程并且超时) 第一次来timedOut是false,第一项也不可能大于最大 所以不执行这里 && (wc > 1 || workQueue.isEmpty())) { //工作线程大于1 或者队列是空 if (compareAndDecrementWorkerCount(c)) return null; continue; //继续循环 (队列有数据不会走这里) } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //在这个时间内能取出任务就好,取不出,阻塞这么一段时间就返回null workQueue.take(); //允许销毁 阻塞一个允许空闲时间 不允许的话取到任务执行 取不到就阻塞就阻塞在队列这里 if (r != null) return r; timedOut = true; //改成true 第二次就能判断是否允许销毁的标识了 } catch (InterruptedException retry) { timedOut = false; } } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3