【java并发】ScheduledThreadPoolExecutor详解 |
您所在的位置:网站首页 › ScheduledThreadPoolExecutor原理 › 【java并发】ScheduledThreadPoolExecutor详解 |
文章目录
ScheduledThreadPoolExecutor详解一、ScheduledThreadPoolExecutor简介二、处理流程三、ScheduledThreadPoolExecutor基本元素3.1 构造函数3.2 RunnableScheduledFuture 接口3.3 DelayedWorkQueue 延时队列3.3.1 add任务入队3.3.2 take任务出队
四、线程池任务调度4.1 schedule延迟执行任务4.2 sheduleAtFixedRate以固定的周期执行任务4.3 scheduleWithFixedDelay 以固定的延迟周期执行任务
五、总结
ScheduledThreadPoolExecutor详解
一、ScheduledThreadPoolExecutor简介
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运 行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但 ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而 ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。 二、处理流程1、它的处理流程如下:主线程提交的定时任务或延时任务采用DelayQueue存储。 2、DelayQueue内部封装了一个PriorityQueue(优先队列,是一种堆结构),它会根据time的先后时间排序,若 time相同则根据sequenceNumber排序;DelayQueue也是一个无界队列; 3、工作线程会从DelayQueue取已经到期的任务去执行; 4、执行结束后重新设置任务的到期时间,再次放回DelayQueue 三、ScheduledThreadPoolExecutor基本元素 3.1 构造函数一共有 4 个构造函数,全部都是调用的父类(ThreadPoolExecutor)的构造函数,任务队列使用的是 DelayedWorkQueue: // 第一个构造函数 public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } // 第二个构造函数 public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } // 第三个构造函数 public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), handler); } // 第四个构造函数 public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); } 3.2 RunnableScheduledFuture 接口ScheduledThreadPoolExecutor 会将所有实现了 Runnable 接口的任务包装成 RunnableScheduledFuture 接口对象。而 ScheduledFutureTask 就是 RunnableScheduledFuture 接口的实现类: RunnableScheduledFuture t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit), sequencer.getAndIncrement()));ScheduledFutureTask 的源码如下: private class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture { // 任务序号,自增ID private final long sequenceNumber; // 首次执行的时间点(单位:纳秒) private volatile long time; /** * 任务类型标识 * * 0: 非周期任务 * >0: fixed-rate 任务 * = queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 堆上浮操作 siftUp(i, e); } // 如果当前添加的元素是队首的元素 if (queue[0] == e) { leader = null; // 唤醒一个等待获取元素的线程 available.signal(); } } finally { // 解锁 lock.unlock(); } return true; }要点如下: 如果新任务入队的时候,队列已满,则会触发扩容操作。扩容时,每次增加原来容量的 50%,新容量是原容量的 1.5 倍。新任务入队之后,会执行堆上浮操作,根据小顶堆的特性,将新加入的任务调整到堆中合适的位置。 3.3.2 take任务出队通过调用 take 方法,获取待执行的任务: public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 轮询 for (;;) { // 每次都从队首获取待执行的任务 RunnableScheduledFuture first = queue[0]; // 如果队列为空,等待任务入队 if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); // 如果队首任务已到期,将队首的任务取出来 if (delay |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |