线程池工具类封装(高级版)

您所在的位置:网站首页 自定义线程池怎么写 线程池工具类封装(高级版)

线程池工具类封装(高级版)

2024-07-14 22:54| 来源: 网络整理| 查看: 265

原文地址:https://blog.xaoxu.cn/archives/thread-pool-tool-package

为什么需要一个线程池工具类?

整个项目,用到线程执行任务的地方很多,不可能哪里用到就在那里直接 new 一个线程执行,这样资源得不到重复利用,一旦线程过多就会导致内存不足。

线程池的好处是什么?

使用线程池执行线程任务,当一个线程执行完成一个任务之后,线程资源回到线程池,资源得到重复利用。

线程池为什么使用自定义方式?

因为 java 自带线程池都会有可能造成内存不足的问题。自定义线程池,根据服务器配置定制线程池核心线程、最大线程等,是最好的方式。

我封装的线程池工具类有什么好处? 扩展性高可注解形式实现执行可根据业务需要注册不同的线程池,区分业务模块使用可以执行无返回值线程任务,可以执行有返回值的线程任务。 代码实现 创建一个线程任务类

该类主要用来承接 Runnable 方法,和其他业务相关需要的参数。

package com.scaffolding.example.threads; import com.scaffolding.example.threads.aop.Pooled; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; import java.util.concurrent.Executors; /** * 需要执行的线程任务 * * @author XiaoXuxuy * @date 2022/2/20 14:32 */ @Data @NoArgsConstructor @AllArgsConstructor public class Worker implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class); // 默认超时时间 private static final long DEFAULT_TIMEOUT = 500; // 执行指令 private Runnable command; // 返回结果 private Result result = new Result(); // 超时 private long timeout; // 策略 private Pooled.PoolOverAct poolOverAct = Pooled.PoolOverAct.REJECT; // 预备执行时间 private volatile long prepareExecutionTime; // 开始执行时间 private volatile long startExecutionTime; // 结束执行时间 private volatile long endExecutionTime; // 执行的线程池名称 private String executorName; public Worker(Runnable command) { this.command = command; this.timeout = DEFAULT_TIMEOUT; } public Worker(Runnable command, Pooled.PoolOverAct poolOverAct) { this.command = command; this.timeout = DEFAULT_TIMEOUT; this.poolOverAct = poolOverAct; } public Worker(Runnable command, T result) { this.command = command; this.result = new Result(result); this.timeout = DEFAULT_TIMEOUT; } public Worker(Runnable command, T result, long timeout) { this.command = command; this.result = new Result(result); this.timeout = timeout; } @Override public void run() { startExecution(); try { command.run(); } finally { endExecution(); } } /** * 开始执行(预备执行耗时) */ private void startExecution() { this.startExecutionTime = System.currentTimeMillis(); LOGGER.info("POOL_DISPATCH_TIME, EXECUTOR: {}, TIME: {} ms", this.executorName, this.getPrepareTime()); } /** * 结束执行(执行耗时) */ private void endExecution() { this.endExecutionTime = System.currentTimeMillis(); LOGGER.info("POOL_EXECUTE_TIME, EXECUTOR: {}, TIME: {} ms", this.executorName, this.getExecutionTime()); } /** * 预备耗时 * * @return */ public long getPrepareTime() { return this.startExecutionTime - this.prepareExecutionTime; } /** * 执行耗时 * * @return */ public long getExecutionTime() { return this.endExecutionTime - this.startExecutionTime; } /** * callable执行线程 * * @return */ public Callable callable() { return Executors.callable(command, result); } public void setResult(T result) { if (null != this.result) { this.result.value = result; } } } 定义一个泛型结果类

承接具有执行结果的线程任务

package com.scaffolding.example.threads; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 线程结果 * * @author XiaoXuxuy * @date 2022/2/20 14:33 */ @Data @NoArgsConstructor @AllArgsConstructor public class Result { public T value; } 定义线程池 package com.scaffolding.example.threads; import lombok.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 任务线程池 * * @author XiaoXuxuy * @date 2022/2/19 19:34 */ @Data public class TaskToolExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(TaskToolExecutor.class); /** * 默认核心线程数 */ private static final int DEFAULT_CORE_SIZE = 20; /** * 默认最大线程数 */ private static final int DEFAULT_MAX_SIZE = 50; /** * 默认空闲线程存活时间 */ private static final long DEFAULT_ALIVE_TIME = 60; /** * 默认队列数量 */ private static final int DEFAULT_QUEUE_SIZE = 1024; /** * 线程池 */ private ExecutorService pool; /** * 线程工厂 */ private ThreadFactory threadFactory; /** * 任务较多时暂存队列 */ private BlockingQueue workQueue; /** * 核心线程数 */ private int coreSize; /** * 最大线程数 */ private int maxSize; /** * 空闲线程存活时间 */ private long aliveTime; /** * 队列数量 */ private int queueSize; /** * 线程池名称 */ private String name; /** * 初始化线程池 */ public void init() { if (null == pool) { if (null == workQueue) { queueSize = queueSize > 0 ? queueSize : DEFAULT_QUEUE_SIZE; workQueue = new LinkedBlockingQueue(queueSize); } if (null == threadFactory) { threadFactory = TaskToolExecutor.defaultThreadFactory(); } coreSize = coreSize > 0 ? coreSize : DEFAULT_CORE_SIZE; maxSize = maxSize > 0 ? maxSize : DEFAULT_MAX_SIZE; aliveTime = aliveTime > 0 ? aliveTime : DEFAULT_ALIVE_TIME; pool = new ThreadPoolExecutor(coreSize, maxSize, aliveTime, TimeUnit.SECONDS, workQueue, threadFactory); } } /** * 销毁线程池 */ public void destroy() { this.pool.shutdown(); } /** * 执行Task * * @param worker */ public void execute(Worker worker) { try { worker.setExecutorName(this.name); worker.setPrepareExecutionTime(System.currentTimeMillis()); pool.execute(worker); } catch (RejectedExecutionException e) { // 拒绝策略 dealWhenPoolFull(worker, e); } } /** * 提交Task,可获取线程返回结果 * * @param worker * @param * @return */ public T submit(Worker worker) { try { Future future = pool.submit(worker.callable()); Result result = future.get(worker.getTimeout(), TimeUnit.MILLISECONDS); return result.value; } catch (RejectedExecutionException e) { LOGGER.error("Rejected worker: Perhaps thread pool is full!", e); } catch (InterruptedException e) { LOGGER.error("Interrupted worker:", e); } catch (ExecutionException e) { LOGGER.error("Attempting to retrieve the result of a task that aborted!", e); } catch (TimeoutException e) { LOGGER.error("Timeout worker: get result timeout", e); } return worker.getResult().value; } /** * 线程池占满之后拒绝策略 * * @param worker * @param e */ private void dealWhenPoolFull(Worker worker, RejectedExecutionException e) { switch (worker.getPoolOverAct()) { case REJECT: LOGGER.error("Rejected worker: Perhaps thread pool is full!", e); break; case RUN: worker.run(); break; case BLOCK: try { workQueue.put(worker); } catch (InterruptedException interruptedException) { LOGGER.error("queue put worker: Perhaps block queue is full!", e); } break; case NEW_THREAD: Thread newThreadOutOfPool = threadFactory.newThread(worker); newThreadOutOfPool.setName("outOfPool-" + newThreadOutOfPool.getName()); newThreadOutOfPool.start(); break; default: LOGGER.error("Rejected worker: Perhaps thread pool is full!", e); break; } } /** * 默认线程工厂 */ static class DefaultThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "taskTool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); // 守护线程 if (thread.isDaemon()) thread.setDaemon(false); // 线程优先级 if (thread.getPriority() != Thread.NORM_PRIORITY) thread.setPriority(Thread.NORM_PRIORITY); return thread; } } public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } public void setQueueSize(int queueSize) { if (queueSize { System.out.println("我是任务" + i); countDownLatch.countDown(); }), Pooled.PoolOverAct.NEW_THREAD); } /** * 测试有返回结果线程 */ @Test public void testHasResultTask() { TaskToolExecutor executor = executorMap.get("ciToolExecutor"); final CountDownLatch countDownLatch = new CountDownLatch(1); Worker worker = createHasResultTestWorker(countDownLatch); Object result = executor.submit(worker); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程执行返回结果为: " + result); } private Worker createHasResultTestWorker(CountDownLatch countDownLatch) { Worker worker = new Worker(); Runnable runnable = () -> { int count = 1; // 设置返回值 worker.setResult(count); countDownLatch.countDown(); }; worker.setTimeout(500); worker.setCommand(runnable); return worker; } } 测试结果

无返回值情况:

2022-02-20 17:52:43 INFO POOL_DISPATCH_TIME, EXECUTOR: ciToolExecutor, TIME: 1 ms 2022-02-20 17:52:43 INFO POOL_DISPATCH_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms 2022-02-20 17:52:43 INFO POOL_DISPATCH_TIME, EXECUTOR: ciToolExecutor, TIME: 1 ms 我是任务1 我是任务0 我是任务2 2022-02-20 17:52:43 INFO POOL_EXECUTE_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms 2022-02-20 17:52:43 INFO POOL_EXECUTE_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms 所有线程执行完毕! 2022-02-20 17:52:43 INFO POOL_EXECUTE_TIME, EXECUTOR: ciToolExecutor, TIME: 0 ms

有返回值情况:

线程执行返回结果为: 1 注解方式实现 定义一个注解 package com.scaffolding.example.threads.aop; import java.lang.annotation.Retention; import java.lang.annotation.Target; import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * 线程池切面 * * @author XiaoXuxuy * @date 2022/2/20 15:35 */ @Target({METHOD}) @Retention(RUNTIME) public @interface Pooled { boolean async() default true; long timeout() default 500; String executor() default "ciToolExecutor"; PoolOverAct poolOverAct() default PoolOverAct.REJECT; enum PoolOverAct { REJECT, BLOCK, RUN, NEW_THREAD; } } 定义一个Aop切面 package com.scaffolding.example.threads.aop; import com.scaffolding.example.threads.TaskToolExecutor; import com.scaffolding.example.threads.Worker; import com.scaffolding.example.utils.PrimitiveUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.Map; /** * @author XiaoXuxuy * @date 2022/2/20 15:35 */ @Aspect @Component @Order(AopOrder.POOLED_INVOKER_ORDER) public class PooledInvoker { private static final Logger LOGGER = LoggerFactory.getLogger(PooledInvoker.class); @Autowired private Map executorMap; @Around("@annotation(com.scaffolding.example.threads.aop.Pooled) && @annotation(pooled)") public Object around(final ProceedingJoinPoint pjp, Pooled pooled) { TaskToolExecutor executor = getExecutor(pooled.executor()); Object result = null; Worker worker = toWorker(pjp); worker.setPoolOverAct(pooled.poolOverAct()); if (pooled.async()) { executor.execute(worker); } else { worker.setTimeout(pooled.timeout()); result = executor.submit(worker); } if (null == result) { Class returnType = ((MethodSignature) pjp.getSignature()).getMethod().getReturnType(); if (returnType.isPrimitive()) { return PrimitiveUtils.getPrimitiveDefaultValue(returnType); } } return result; } private Worker toWorker(ProceedingJoinPoint pjp) { final Worker worker = new Worker(); Runnable command = () -> { try { worker.setResult(pjp.proceed()); } catch (Throwable e) { LOGGER.error("Error pooled execute:", e); } }; worker.setCommand(command); return worker; } private TaskToolExecutor getExecutor(String poolType) { return executorMap.get(poolType); } } 使用方式

只要在方法加入 @Pooled 注解即可利用线程池执行该方法。

@Pooled(executor = "msgExecutor", poolOverAct = Pooled.PoolOverAct.NEW_THREAD) public void notifyEmail() { System.out.println("我是通知线程"); } 附:Aop的拦截顺序定义 package com.scaffolding.example.threads.aop; import org.springframework.core.Ordered; /** * Aop的拦截顺序定义 * POOLED_INVOKER_ORDER > DISTRIBUTED_LOCK_ORDER > 日志 > 事务 * * @author XiaoXuxuy * @date 2022/2/20 15:38 */ public class AopOrder { public static final int DISTRIBUTED_LOCK_ORDER = Ordered.HIGHEST_PRECEDENCE + 4; public static final int POOLED_INVOKER_ORDER = Ordered.HIGHEST_PRECEDENCE + 2; } 附:Class原始类型工具类 package com.scaffolding.example.utils; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class PrimitiveUtils { private static final ConcurrentMap NAME_CLASS_CACHE = new ConcurrentHashMap(); /** * void(V). */ public static final char JVM_VOID = 'V'; /** * boolean(Z). */ public static final char JVM_BOOLEAN = 'Z'; /** * byte(B). */ public static final char JVM_BYTE = 'B'; /** * char(C). */ public static final char JVM_CHAR = 'C'; /** * double(D). */ public static final char JVM_DOUBLE = 'D'; /** * float(F). */ public static final char JVM_FLOAT = 'F'; /** * int(I). */ public static final char JVM_INT = 'I'; /** * long(J). */ public static final char JVM_LONG = 'J'; /** * short(S). */ public static final char JVM_SHORT = 'S'; public static boolean isPrimitives(Class cls) { if (cls.isArray()) { return isPrimitive(cls.getComponentType()); } return isPrimitive(cls); } public static boolean isPrimitive(Class cls) { return cls.isPrimitive() || cls == String.class || cls == Boolean.class || cls == Character.class || Number.class.isAssignableFrom(cls) || Date.class.isAssignableFrom(cls); } public static boolean isPojo(Class cls) { return ! isPrimitives(cls) && ! Collection.class.isAssignableFrom(cls) && ! Map.class.isAssignableFrom(cls); } /** * name to class. * "boolean" => boolean.class * "java.util.Map[][]" => java.util.Map[][].class * * @param name name. * @return Class instance. */ public static Class name2class(String name) throws ClassNotFoundException { return name2class(getClassLoader(), name); } /** * name to class. * "boolean" => boolean.class * "java.util.Map[][]" => java.util.Map[][].class * * @param cl ClassLoader instance. * @param name name. * @return Class instance. */ private static Class name2class(ClassLoader cl, String name) throws ClassNotFoundException { int c = 0, index = name.indexOf('['); if( index > 0 ) { c = ( name.length() - index ) / 2; name = name.substring(0, index); } if( c > 0 ) { StringBuilder sb = new StringBuilder(); while( c-- > 0 ) sb.append("["); if( "void".equals(name) ) sb.append(JVM_VOID); else if( "boolean".equals(name) ) sb.append(JVM_BOOLEAN); else if( "byte".equals(name) ) sb.append(JVM_BYTE); else if( "char".equals(name) ) sb.append(JVM_CHAR); else if( "double".equals(name) ) sb.append(JVM_DOUBLE); else if( "float".equals(name) ) sb.append(JVM_FLOAT); else if( "int".equals(name) ) sb.append(JVM_INT); else if( "long".equals(name) ) sb.append(JVM_LONG); else if( "short".equals(name) ) sb.append(JVM_SHORT); else sb.append('L').append(name).append(';'); // "java.lang.Object" ==> "Ljava.lang.Object;" name = sb.toString(); } else { if( "void".equals(name) ) return void.class; else if( "boolean".equals(name) ) return boolean.class; else if( "byte".equals(name) ) return byte.class; else if( "char".equals(name) ) return char.class; else if( "double".equals(name) ) return double.class; else if( "float".equals(name) ) return float.class; else if( "int".equals(name) ) return int.class; else if( "long".equals(name) ) return long.class; else if( "short".equals(name) ) return short.class; } if( cl == null ){ cl = getClassLoader(); } Class clazz = NAME_CLASS_CACHE.get(name); if(clazz == null){ clazz = Class.forName(name, true, cl); NAME_CLASS_CACHE.put(name, clazz); } return clazz; } public static ClassLoader getClassLoader(){ return getClassLoader(PrimitiveUtils.class); } /** * get class loader * * @param cls * @return class loader */ public static ClassLoader getClassLoader(Class cls) { ClassLoader cl = null; try { cl = Thread.currentThread().getContextClassLoader(); } catch (Throwable ex) { // Cannot access thread context ClassLoader - falling back to system class loader... } if (cl == null) { // No thread context class loader -> use class loader of this class. cl = cls.getClassLoader(); } return cl; } private static final Map PRIMITIVES = new HashMap(); static { addPrimitive(boolean.class, "Z", Boolean.class, "booleanValue", false); addPrimitive(short.class, "S", Short.class, "shortValue", (short) 0); addPrimitive(int.class, "I", Integer.class, "intValue", 0); addPrimitive(long.class, "J", Long.class, "longValue", 0L); addPrimitive(float.class, "F", Float.class, "floatValue", 0F); addPrimitive(double.class, "D", Double.class, "doubleValue", 0D); addPrimitive(char.class, "C", Character.class, "charValue", '\0'); addPrimitive(byte.class, "B", Byte.class, "byteValue", (byte) 0); addPrimitive(void.class, "V", Void.class, null, null); } private static void addPrimitive(Class type, String typeCode, Class wrapperType, String unwrapMethod, T defaultValue) { PrimitiveInfo info = new PrimitiveInfo(type, typeCode, wrapperType, unwrapMethod, defaultValue); PRIMITIVES.put(type.getName(), info); PRIMITIVES.put(wrapperType.getName(), info); } /** 代表一个primitive类型的信息。 */ @SuppressWarnings("unused") private static class PrimitiveInfo { final Class type; final String typeCode; final Class wrapperType; final String unwrapMethod; final T defaultValue; public PrimitiveInfo(Class type, String typeCode, Class wrapperType, String unwrapMethod, T defaultValue) { this.type = type; this.typeCode = typeCode; this.wrapperType = wrapperType; this.unwrapMethod = unwrapMethod; this.defaultValue = defaultValue; } } /** * 取得primitive类型的wrapper。如果不是primitive,则原样返回。 *

* 例如: *

* * ClassUtil.getPrimitiveWrapperType(int.class) = Integer.class; * ClassUtil.getPrimitiveWrapperType(int[].class) = int[].class; * ClassUtil.getPrimitiveWrapperType(int[][].class) = int[][].class; * ClassUtil.getPrimitiveWrapperType(String[][].class) = String[][].class; * *

*

*/ @SuppressWarnings("unchecked") public static Class getWrapperTypeIfPrimitive(Class type) { if (type.isPrimitive()) { return ((PrimitiveInfo) PRIMITIVES.get(type.getName())).wrapperType; } return type; } /** * 取得primitive类型的默认值。如果不是primitive,则返回null。 *

* 例如: *

* * ClassUtil.getPrimitiveDefaultValue(int.class) = 0; * ClassUtil.getPrimitiveDefaultValue(boolean.class) = false; * ClassUtil.getPrimitiveDefaultValue(char.class) = '\0'; * *

*

*/ @SuppressWarnings("unchecked") public static T getPrimitiveDefaultValue(Class type) { PrimitiveInfo info = (PrimitiveInfo) PRIMITIVES.get(type.getName()); if (info != null) { return info.defaultValue; } return null; } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3