延迟任务的11种实现方式(上)!!【送源码】

您所在的位置:网站首页 延迟队列实现方式 延迟任务的11种实现方式(上)!!【送源码】

延迟任务的11种实现方式(上)!!【送源码】

2024-07-12 08:03| 来源: 网络整理| 查看: 265

延迟任务在我们日常生活中比较常见,比如订单支付超时取消订单功能,又比如自动确定收货的功能等等。

所以本篇文章就来从实现到原理来盘点延迟任务的11种实现方式,这些方式并没有绝对的好坏之分,只是适用场景的不大相同。

图片

DelayQueue

DelayQueue是JDK提供的api,是一个延迟队列

DelayQueue泛型参数得实现Delayed接口,Delayed继承了Comparable接口。

图片

getDelay方法返回这个任务还剩多久时间可以执行,小于0的时候说明可以这个延迟任务到了执行的时间了。

compareTo这个是对任务排序的,保证最先到延迟时间的任务排到队列的头。

来个demo @Getter public class SanYouTask implements Delayed {     private final String taskContent;     private final Long triggerTime;     public SanYouTask(String taskContent, Long delayTime) {         this.taskContent = taskContent;         this.triggerTime = System.currentTimeMillis() + delayTime * 1000;     }     @Override     public long getDelay(TimeUnit unit) {         return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);     }     @Override     public int compareTo(Delayed o) {         return this.triggerTime.compareTo(((SanYouTask) o).triggerTime);     } }

SanYouTask实现了Delayed接口,构造参数

taskContent:延迟任务的具体的内容

delayTime:延迟时间,秒为单位

测试

@Slf4j public class DelayQueueDemo {     public static void main(String[] args) {         DelayQueue sanYouTaskDelayQueue = new DelayQueue();         new Thread(() -> {             while (true) {                 try {                     SanYouTask sanYouTask = sanYouTaskDelayQueue.take();                     log.info("获取到延迟任务:{}", sanYouTask.getTaskContent());                 } catch (Exception e) {                 }             }         }).start();         log.info("提交延迟任务");         sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记5s", 5L));         sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记3s", 3L));         sanYouTaskDelayQueue.offer(new SanYouTask("三友的java日记8s", 8L));     } }

开启一个线程从DelayQueue中获取任务,然后提交了三个任务,延迟时间分为别5s,3s,8s。

测试结果:

图片

成功实现了延迟任务。

实现原理

图片

offer方法在提交任务的时候,会通过根据compareTo的实现对任务进行排序,将最先需要被执行的任务放到队列头。

take方法获取任务的时候,会拿到队列头部的元素,也就是队列中最早需要被执行的任务,通过getDelay返回值判断任务是否需要被立刻执行,如果需要的话,就返回任务,如果不需要就会等待这个任务到延迟时间的剩余时间,当时间到了就会将任务返回。

Timer

Timer也是JDK提供的api

先来demo @Slf4j public class TimerDemo {     public static void main(String[] args) {         Timer timer = new Timer();                  log.info("提交延迟任务");         timer.schedule(new TimerTask() {             @Override             public void run() {                 log.info("执行延迟任务");             }         }, 5000);     } }

通过schedule提交一个延迟时间为5s的延迟任务

图片

实现原理

提交的任务是一个TimerTask

public abstract class TimerTask implements Runnable {     //忽略其它属性          long nextExecutionTime; }

TimerTask内部有一个nextExecutionTime属性,代表下一次任务执行的时间,在提交任务的时候会计算出nextExecutionTime值。

Timer内部有一个TaskQueue对象,用来保存TimerTask任务的,会根据nextExecutionTime来排序,保证能够快速获取到最早需要被执行的延迟任务。

在Timer内部还有一个执行任务的线程TimerThread,这个线程就跟DelayQueue demo中开启的线程作用是一样的,用来执行到了延迟时间的任务。

所以总的来看,Timer有点像整体封装了DelayQueue demo中的所有东西,让用起来简单点。

虽然Timer用起来比较简单,但是在阿里规范中是不推荐使用的,主要是有以下几点原因:

Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理

Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,不安全

ScheduledThreadPoolExecutor

由于Timer在使用上有一定的问题,所以在JDK1.5版本的时候提供了ScheduledThreadPoolExecutor,这个跟Timer的作用差不多,并且他们的方法的命名都是差不多的,但是ScheduledThreadPoolExecutor解决了单线程和异常崩溃等问题。

来个demo @Slf4j public class ScheduledThreadPoolExecutorDemo {     public static void main(String[] args) {         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2, new ThreadPoolExecutor.CallerRunsPolicy());         log.info("提交延迟任务");         executor.schedule(() -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS);     } }

结果

图片

实现原理

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,也就是继承了线程池,所以可以有很多个线程来执行任务。

ScheduledThreadPoolExecutor在构造的时候会传入一个DelayedWorkQueue阻塞队列,所以线程池内部的阻塞队列是DelayedWorkQueue。

图片

在提交延迟任务的时候,任务会被封装一个任务会被封装成ScheduledFutureTask对象,然后放到DelayedWorkQueue阻塞队列中。

图片

ScheduledFutureTask

ScheduledFutureTask实现了前面提到的Delayed接口,所以其实可以猜到DelayedWorkQueue会根据ScheduledFutureTask对于Delayed接口的实现来排序,所以线程能够获取到最早到延迟时间的任务。

当线程从DelayedWorkQueue中获取到需要执行的任务之后就会执行任务。

RocketMQ

RocketMQ是阿里开源的一款消息中间件,实现了延迟消息的功能,如果有对RocketMQ不熟悉的小伙伴可以看一下我之前写的RocketMQ保姆级教程和RocketMQ消息短暂而又精彩的一生 这两篇文章。

RocketMQ延迟消息的延迟时间默认有18个等级。

图片

当发送消息的时候只需要指定延迟等级即可。如果这18个等级的延迟时间不符和你的要求,可以修改RocketMQ服务端的配置文件。

来个demo

依赖

    org.apache.rocketmq     rocketmq-spring-boot-starter     2.2.1        org.springframework.boot     spring-boot-starter-web     2.2.5.RELEASE

配置文件

rocketmq:   name-server: 192.168.200.144:9876 #服务器ip:nameServer端口   producer:     group: sanyouProducer

controller类,通过DefaultMQProducer发送延迟消息到sanyouDelayTaskTopic这个topic,延迟等级为2,也就是延迟时间为5s的意思。

@RestController @Slf4j public class RocketMQDelayTaskController {     @Resource     private DefaultMQProducer producer;     @GetMapping("/rocketmq/add")     public void addTask(@RequestParam("task") String task) throws Exception {         Message msg = new Message("sanyouDelayTaskTopic", "TagA", task.getBytes(RemotingHelper.DEFAULT_CHARSET));         msg.setDelayTimeLevel(2);         // 发送消息并得到消息的发送结果,然后打印         log.info("提交延迟任务");         producer.send(msg);     } }

创建一个消费者,监听sanyouDelayTaskTopic的消息。

@Component @RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouDelayTaskTopic") @Slf4j public class SanYouDelayTaskTopicListener implements RocketMQListener {     @Override     public void onMessage(String msg) {         log.info("获取到延迟任务:{}", msg);     } }

启动应用,浏览器输入以下链接添加任务

http://localhost:8080/rocketmq/add?task=sanyou

测试结果:

图片

实现原理

图片

生产者发送延迟消息之后,RocketMQ服务端在接收到消息之后,会去根据延迟级别是否大于0来判断是否是延迟消息

如果不大于0,说明不是延迟消息,那就会将消息保存到指定的topic中

如果大于0,说明是延迟消息,此时RocketMQ会进行一波偷梁换柱的操作,将消息的topic改成SCHEDULE_TOPIC_XXXX中,XXXX不是占位符,然后存储。

在BocketMQ内部有一个延迟任务,相当于是一个定时任务,这个任务就会获取SCHEDULE_TOPIC_XXXX中的消息,判断消息是否到了延迟时间,如果到了,那么就会将消息的topic存储到原来真正的topic(拿我们的例子来说就是sanyouDelayTaskTopic)中,之后消费者就可以从真正的topic中获取到消息了。

图片

定时任务

RocketMQ这种实现方式相比于前面提到的三种更加可靠,因为前面提到的三种任务内容都是存在内存的,服务器重启任务就丢了,如果要实现任务不丢还得自己实现逻辑,但是RocketMQ消息有持久化机制,能够保证任务不丢失。

RabbitMQ

RabbitMQ也是一款消息中间件,通过RabbitMQ的死信队列也可以是先延迟任务的功能。

demo

引入RabbitMQ的依赖

    org.springframework.boot     spring-boot-starter-amqp     2.2.5.RELEASE

配置文件

spring:   rabbitmq:     host: 192.168.200.144 #服务器ip     port: 5672     virtual-host: /

RabbitMQ死信队列的配置类,后面说原理的时候会介绍干啥的

@Configuration public class RabbitMQConfiguration {          @Bean     public DirectExchange sanyouDirectExchangee() {         return new DirectExchange("sanyouDirectExchangee");     }     @Bean     public Queue sanyouQueue() {         return QueueBuilder                 //指定队列名称,并持久化                 .durable("sanyouQueue")                 //设置队列的超时时间为5秒,也就是延迟任务的时间                 .ttl(5000)                 //指定死信交换机                 .deadLetterExchange("sanyouDelayTaskExchangee")                 .build();     }     @Bean     public Binding sanyouQueueBinding() {         return BindingBuilder.bind(sanyouQueue()).to(sanyouDirectExchangee()).with("");     }     @Bean     public DirectExchange sanyouDelayTaskExchange() {         return new DirectExchange("sanyouDelayTaskExchangee");     }     @Bean     public Queue sanyouDelayTaskQueue() {         return QueueBuilder                 //指定队列名称,并持久化                 .durable("sanyouDelayTaskQueue")                 .build();     }     @Bean     public Binding sanyouDelayTaskQueueBinding() {         return BindingBuilder.bind(sanyouDelayTaskQueue()).to(sanyouDelayTaskExchange()).with("");     } }

RabbitMQDelayTaskController用来发送消息,这里没指定延迟时间,是因为在声明队列的时候指定了延迟时间为5s

@RestController @Slf4j public class RabbitMQDelayTaskController {     @Resource     private RabbitTemplate rabbitTemplate;     @GetMapping("/rabbitmq/add")     public void addTask(@RequestParam("task") String task) throws Exception {         // 消息ID,需要封装到CorrelationData中         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());         log.info("提交延迟任务");         // 发送消息         rabbitTemplate.convertAndSend("sanyouDirectExchangee", "", task, correlationData);     } }

启动应用,浏览器输入以下链接添加任务

http://localhost:8080/rabbitmq/add?task=sanyou

测试结果,成功实现5s的延迟任务

图片

实现原理

图片

整个工作流程如下:

消息发送的时候会将消息发送到sanyouDirectExchange这个交换机上

由于sanyouDirectExchange绑定了sanyouQueue,所以消息会被路由到sanyouQueue这个队列上

由于sanyouQueue没有消费者消费消息,并且又设置了5s的过期时间,所以当消息过期之后,消息就被放到绑定的sanyouDelayTaskExchange死信交换机中

消息到达sanyouDelayTaskExchange交换机后,由于跟sanyouDelayTaskQueue进行了绑定,所以消息就被路由到sanyouDelayTaskQueue中,消费者就能从sanyouDelayTaskQueue中拿到消息了

上面说的队列与交换机的绑定关系,就是上面的配置类所干的事。

其实从这个单从消息流转的角度可以看出,RabbitMQ跟RocketMQ实现有相似之处。

消息最开始都并没有放到最终消费者消费的队列中,而都是放到一个中间队列中,等消息到了过期时间或者说是延迟时间,消息就会被放到最终的队列供消费者消息。

只不过RabbitMQ需要你显示的手动指定消息所在的中间队列,而RocketMQ是在内部已经做好了这块逻辑。

除了基于RabbitMQ的死信队列来做,RabbitMQ官方还提供了延时插件,也可以实现延迟消息的功能,这个插件的大致原理也跟上面说的一样,延时消息会被先保存在一个中间的地方,叫做Mnesia,然后有一个定时任务去查询最近需要被投递的消息,将其投递到目标队列中。

监听Redis过期key

在Redis中,有个发布订阅的机制

图片

生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。图中channel理解成MQ中的topic。

并且在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。这里面就有这么一个channel叫做__keyevent@__:expired,db是指Redis数据库的序号。

当某个Redis的key过期之后,Redis内部会发布一个事件到__keyevent@__:expired这个channel上,只要监听这个事件,那么就可以获取到过期的key。

所以基于监听Redis过期key实现延迟任务的原理如下:

将延迟任务作为key,过期时间设置为延迟时间

监听__keyevent@__:expired这个channel,那么一旦延迟任务到了过期时间(延迟时间),那么就可以获取到这个任务

来个demo

Spring已经实现了监听__keyevent@*__:expired这个channel这个功能,__keyevent@*__:expired中的*代表通配符的意思,监听所有的数据库。

所以demo写起来就很简单了,只需4步即可

依赖

    org.springframework.boot     spring-boot-starter-data-redis     2.2.5.RELEASE

配置文件

spring:   redis:     host: 192.168.200.144     port: 6379

配置类

@Configuration public class RedisConfiguration {     @Bean     public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {         RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();         redisMessageListenerContainer.setConnectionFactory(connectionFactory);         return redisMessageListenerContainer;     }     @Bean     public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {         return new KeyExpirationEventMessageListener(redisMessageListenerContainer);     } }

KeyExpirationEventMessageListener实现了对__keyevent@*__:expiredchannel的监听

图片

当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件

图片

所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key,也就是延迟消息。

对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener

@Component public class MyRedisKeyExpiredEventListener implements ApplicationListener {     @Override     public void onApplicationEvent(RedisKeyExpiredEvent event) {         byte[] body = event.getSource();         System.out.println("获取到延迟消息:" + new String(body));     } }

代码写好,启动应用

之后我直接通过Redis命令设置消息,就没通过代码发送消息了,消息的key为sanyou,值为task,值不重要,过期时间为5s

set sanyou task expire sanyou 5

成功获取到延迟任务

图片

虽然这种方式可以实现延迟任务,但是这种方式坑比较多

任务存在延迟

Redis过期事件的发布不是指key到了过期时间就发布,而是key到了过期时间被清除之后才会发布事件。

而Redis过期key的两种清除策略,就是面试八股文常背的两种:

惰性清除。当这个key过期之后,访问时,这个Key才会被清除

定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除

所以即使key到了过期时间,Redis也不一定会发送key过期事件,这就到导致虽然延迟任务到了延迟时间也可能获取不到延迟任务。

丢消息太频繁

Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。

所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了。

消息消费只有广播模式

Redis的发布订阅模式消息消费只有广播模式一种。

所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息。

图片

如图,生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息。

所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。

接收到所有key的某个事件

这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题。

当监听了__keyevent@__:expired的channel,那么所有的Redis的key只要发生了过期事件都会被通知给消费者,不管这个key是不是消费者想接收到的。

所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务。

   ——EOF——

福利:

扫码回复【酒店】可免费领取酒店管理系统源码



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3