Spring线程池ThreadPoolTaskExecutor的使用

您所在的位置:网站首页 spring线程池使用实例 Spring线程池ThreadPoolTaskExecutor的使用

Spring线程池ThreadPoolTaskExecutor的使用

2023-06-07 21:18| 来源: 网络整理| 查看: 265

目录 1 线程池简介 1.1 为什么使用线程池 1.2 线程池为什么需要使用队列 1.3 线程池为什么要使用阻塞队列而不使用非阻塞队列 1.4 如何配置线程池 1.5 execute()和submit()方法 1.6 Spring线程池 1.7 @Async调用中的事务处理机制 2 示例 2.1 线程池配置类 2.2 异步方法 2.3 启动测试

 

1 线程池简介 1.1 为什么使用线程池 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗; 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行; 方便线程并发数的管控,因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场) 提供更强大的功能,延时定时线程池 1.2 线程池为什么需要使用队列

因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换。

创建线程池的消耗较高或者线程池创建线程需要获取mainlock这个全局锁,影响并发效率,阻塞队列可以很好的缓冲

1.3 线程池为什么要使用阻塞队列而不使用非阻塞队列

阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源,当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。使得在线程不至于一直占用cpu资源。(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢

1.4 如何配置线程池

CPU密集型任务尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换

IO密集型任务可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间

混合型任务可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失

1.5 execute()和submit()方法 execute(),执行一个任务,没有返回值 submit(),提交一个线程任务,有返回值

submit(Callable task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值。submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null

Future.get()方法会使取结果的线程进入阻塞状态,直到线程执行完成之后,唤醒取结果的线程,然后返回结果

1.6 Spring线程池

Spring 通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用ThreadPoolTaskExecutor实现一个基于线程池的TaskExecutor,还得需要使用@EnableAsync开启异步,并通过在需要的异步方法那里使用注解@Async声明是一个异步任务Spring 已经实现的异常线程池:

SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。 SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方 ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类 SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类 ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装 1.7 @Async调用中的事务处理机制

点击了解使用@Async使用的事务问题

2 示例 2.1 线程池配置类 package cn.jzh.thread; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @ComponentScan("cn.jzh.thread") @EnableAsync //开启异步操作 public class TaskExecutorConfig implements AsyncConfigurer { /** * 通过getAsyncExecutor方法配置ThreadPoolTaskExecutor,获得一个基于线程池TaskExecutor * * @return */ @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(5);//核心线程数 pool.setMaxPoolSize(10);//最大线程数 pool.setQueueCapacity(25);//线程队列 pool.initialize();//线程初始化 return pool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }

配置类中方法说明:Spring 中的ThreadPoolExecutor是借助JDK并发包中的java.util.concurrent.ThreadPoolExecutor来实现的。其中一些值的含义如下:

int corePoolSize:线程池维护线程的最小数量 int maximumPoolSize:线程池维护线程的最大数量,线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。 long keepAliveTime:空闲线程的存活时间TimeUnit unit:时间单位,现由纳秒,微秒,毫秒,秒 BlockingQueue workQueue:持有等待执行的任务队列 RejectedExecutionHandler handler 线程池的拒绝策略,是指当任务添加到线程池中被拒绝,而采取的处理措施。当任务添加到线程池中之所以被拒绝,可能是由于:第一,线程池异常关闭。第二,任务数量超过线程池的最大限制。Reject策略预定义有四种: ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃. ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程) 自定义策略:当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务 2.2 异步方法

@Async注解可以用在方法上,表示该方法是个异步方法,也可以用在类上,那么表示此类的所有方法都是异步方法异步方法会自动注入使用ThreadPoolTaskExecutor作为TaskExecutor

package cn.jzh.thread; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.Future; @Service public class AsyncTaskService { /** * * @param i */ @Async public void executeAsync(Integer i) throws Exception{ System.out.println("线程ID:" + Thread.currentThread().getId() + "线程名字:" +Thread.currentThread().getName()+"执行异步任务:" + i); } @Async public Future executeAsyncPlus(Integer i) throws Exception { System.out.println("线程ID:" + Thread.currentThread().getId() +"线程名字:" +Thread.currentThread().getName()+ "执行异步有返回的任务:" + i); return new AsyncResult("success:"+i); } } 2.3 启动测试 package cn.jzh.thread; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.Future; public class MainApp { public static void main(String[] args) throws Exception{ System.out.println("主线程id:" + Thread.currentThread().getId() + "开始执行调用任务..."); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class); AsyncTaskService service = context.getBean(AsyncTaskService.class); for (int i = 0;i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3