Java中的线程池种类

您所在的位置:网站首页 五种线程池的分类和作用是什么呢 Java中的线程池种类

Java中的线程池种类

2024-07-11 12:08| 来源: 网络整理| 查看: 265

1. newSingleThreadExecutor

创建方式:

ExecutorService pool = Executors.newSingleThreadExecutor();

一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

使用方式:

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPool {    public static void main(String[] args) {        ExecutorService pool = Executors.newSingleThreadExecutor();        for (int i = 0; i < 10; i++) {            pool.execute(() -> {                System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");            });        }    }}

输出结果如下:

pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....pool-1-thread-1    开始发车啦....

从输出的结果我们可以看出,一直只有一个线程在运行。

2.newFixedThreadPool

创建方式:

ExecutorService pool = Executors.newFixedThreadPool(10);

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

使用方式:

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPool {    public static void main(String[] args) {        ExecutorService pool = Executors.newFixedThreadPool(10);        for (int i = 0; i < 10; i++) {            pool.execute(() -> {                System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");            });        }    }}

输出结果如下:

pool-1-thread-1    开始发车啦....pool-1-thread-4    开始发车啦....pool-1-thread-3    开始发车啦....pool-1-thread-2    开始发车啦....pool-1-thread-6    开始发车啦....pool-1-thread-7    开始发车啦....pool-1-thread-5    开始发车啦....pool-1-thread-8    开始发车啦....pool-1-thread-9    开始发车啦....pool-1-thread-10 开始发车啦....

3. newCachedThreadPool

创建方式:

ExecutorService pool = Executors.newCachedThreadPool();

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又添加新线程来处理任务。

使用方式如上2所示。

4.newScheduledThreadPool

创建方式:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

此线程池支持定时以及周期性执行任务的需求。

使用方式:

import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadPool {    public static void main(String[] args) {        ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);        for (int i = 0; i < 10; i++) {            pool.schedule(() -> {                System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");            }, 10, TimeUnit.SECONDS);        }    }}

上面演示的是延迟10秒执行任务,如果想要执行周期性的任务可以用下面的方式,每秒执行一次

//pool.scheduleWithFixedDelay也可以pool.scheduleAtFixedRate(() -> {                System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");}, 1, 1, TimeUnit.SECONDS);

5.newWorkStealingPool

newWorkStealingPool是jdk1.8才有的,会根据所需的并行层次来动态创建和关闭线程,通过使用多个队列减少竞争,底层用的ForkJoinPool来实现的。ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

说说线程池的拒绝策略

当请求任务不断的过来,而系统此时又处理不过来的时候,我们需要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。

AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

public static class AbortPolicy implements RejectedExecutionHandler {    public AbortPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        throw new RejectedExecutionException("Task " + r.toString() +                                                 " rejected from " +                                                 e.toString());    }}

CallerRunsPolicy 策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前的被丢弃的任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {    public CallerRunsPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {                r.run();        }    }}

DiscardOleddestPolicy策略: 该策略将丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {    public DiscardOldestPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        if (!e.isShutdown()) {            e.getQueue().poll();            e.execute(r);        }    }}

DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。

public static class DiscardPolicy implements RejectedExecutionHandler {    public DiscardPolicy() { }    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {    }}

除了JDK默认为什么提供的四种拒绝策略,我们可以根据自己的业务需求去自定义拒绝策略,自定义的方式很简单,直接实现RejectedExecutionHandler接口即可

比如Spring integration中就有一个自定义的拒绝策略CallerBlocksPolicy,将任务插入到队列中,直到队列中有空闲并插入成功的时候,否则将根据最大等待时间一直阻塞,直到超时。

package org.springframework.integration.util;import java.util.concurrent.BlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;public class CallerBlocksPolicy implements RejectedExecutionHandler {    private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);    private final long maxWait;    /**     * @param maxWait The maximum time to wait for a queue slot to be     * available, in milliseconds.     */    public CallerBlocksPolicy(long maxWait) {        this.maxWait = maxWait;    }    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        if (!executor.isShutdown()) {            try {                BlockingQueue queue = executor.getQueue();                if (logger.isDebugEnabled()) {                    logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");                }                if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {                    throw new RejectedExecutionException("Max wait time expired to queue task");                }                if (logger.isDebugEnabled()) {                    logger.debug("Task execution queued");                }            }            catch (InterruptedException e) {                Thread.currentThread().interrupt();                throw new RejectedExecutionException("Interrupted", e);            }        }        else {            throw new RejectedExecutionException("Executor has been shut down");        }    }}

定义好之后如何使用呢?光定义没用的呀,一定要用到线程池中呀,可以通过下面的方式自定义线程池,指定拒绝策略。

BlockingQueue workQueue = new ArrayBlockingQueue(100);ThreadPoolExecutor executor = new ThreadPoolExecutor(    10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy()); execute和submit的区别?

在前面的讲解中,我们执行任务是用的execute方法,除了execute方法,还有一个submit方法也可以执行我们提交的任务。

这两个方法有什么区别呢?分别适用于在什么场景下呢?我们来做一个简单的分析。

execute适用于不需要关注返回值的场景,只需要将线程丢到线程池中去执行就可以了

public class ThreadPool {    public static void main(String[] args) {        ExecutorService pool = Executors.newFixedThreadPool(10);        pool.execute(() -> {            System.out.println(Thread.currentThread().getName() + "\t开始发车啦....");        });    }}

submit方法适用于需要关注返回值的场景,submit方法的定义如下:

public interface ExecutorService extends Executor {  ...   Future submit(Callable task);   Future submit(Runnable task, T result);  Future submit(Runnable task);  ...}

其子类AbstractExecutorService实现了submit方法,可以看到无论参数是Callable还是Runnable,最终都会被封装成RunnableFuture,然后再调用execute执行。

   /**     * @throws RejectedExecutionException {@inheritDoc}     * @throws NullPointerException       {@inheritDoc}     */    public Future


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3