使用RabbitMQ实现未支付订单在30分钟后自动过期

您所在的位置:网站首页 未设置支付策略 使用RabbitMQ实现未支付订单在30分钟后自动过期

使用RabbitMQ实现未支付订单在30分钟后自动过期

2023-11-21 23:03| 来源: 网络整理| 查看: 265

延迟队列可以实现消息在投递到Exchange之后,经过一定的时间之后再路由到相应的Queue。最后被消费者监听消费。即:生产者投递的消息经过一段时间之后再被消费者消费。

常见业务场景:订单在30分钟内还未支付则自动取消。

该业务的其他实现方案:

使用Redis,设置过期时间,监听过期事件。

使用RabbitMQ的过期队列与死信队列,设置消息的存活时间,在设置的时间内未被消费,即会投递到死信队列,我们监听死信队列即可。可参考上一篇文章RabbitMQ死信队列在SpringBoot中的使用。

本文介绍使用RabbitMQ延迟队列来实现。

RabbitMQ实现延迟队列需要依赖插件rabbitmq-delayed-message-exchange。

# 安装延迟队列插件:

RabbitMQ插件列表: https://www.rabbitmq.com/community-plugins.html

延迟队列插件下载地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

将下载的插件copy到RabbitMQ的plugins目录

Mac下的插件路径为/usr/local/Cellar/rabbitmq/3.7.15/plugins

安装并启用

plugin

重启RabbitMQ

# 业务相关代码编写(实现订单在规定的时间内还未支付则过期)

订单实体(仅保留相关字段)

OrderEntity

订单状态枚举(仅保留相关状态)

image.png

OrderMapper

/**  * @author futao  * @date 2020/4/14.  */ public interface OrderMapper extends BaseMapper { }

模拟下定的接口OrderController 为了简单起见,省略了Service层.

OrderController

# RabbitMQ相关代码编写

配置文件

spring:   rabbitmq:     host: localhost     port: 5672     username: futao     password: 123456789     virtual-host: delay-vh     connection-timeout: 15000     # 发送确认     publisher-confirms: true     # 路由失败回调     publisher-returns: true     template:       # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃       mandatory: true     listener:       simple:         # 每次从RabbitMQ获取的消息数量         prefetch: 1         default-requeue-rejected: false         # 每个队列启动的消费者数量         concurrency: 1         # 每个队列最大的消费者数量         max-concurrency: 1         # 手动签收ACK         acknowledge-mode: manual app:   rabbitmq:     # 延迟时长设置     delay:       order: 10S     # 队列定义     queue:       order:         delay: order-delay-queue     # 交换机定义     exchange:       order:         delay: order-delay-exchange

延迟交换机,队列定义与绑定

/**  * 队列,交换机定义与绑定  * 延迟队列插件`rabbitmq-delayed-message-exchange`下载地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange  *  * @author futao  * @date 2020/4/10.  */ @Configuration public class Declare {     /**      * 订单队列 - 接收延迟投递的订单      *      * @param orderQueue 订单队列名称      * @return      */     @Bean     public Queue orderDelayQueue(@Value("${app.rabbitmq.queue.order.delay}") String orderQueue) {         return QueueBuilder                 .durable(orderQueue)                 .build();     }     /**      * 订单交换机-延迟交换机 - 消息延迟一定时间之后再投递到绑定的队列      *      * @param orderExchange 订单延迟交换机      * @return      */     @Bean     public Exchange orderDelayExchange(@Value("${app.rabbitmq.exchange.order.delay}") String orderExchange) {         Map args = new HashMap(1);         args.put("x-delayed-type", "topic");         return new CustomExchange(orderExchange, "x-delayed-message", true, false, args);     }     /**      * 订单队列-交换机 绑定      *      * @param orderQueue         订单队列      * @param orderDelayExchange 订单交换机      * @return      */     @Bean     public Binding orderBinding(Queue orderDelayQueue, Exchange orderDelayExchange) {         return BindingBuilder                 .bind(orderDelayQueue)                 .to(orderDelayExchange)                 .with("order.delay.*")                 .noargs();     } }

可以看出队列就是普通的队列。重点在交换机的设定上。声明延迟交换机需要设置参数x-delayed-type,值为交换机类型,可以是fanout,topic,direct。并且设置交换机的type为x-delayed-message。

定义完成后可以启动SpringBoot应用程序,在RabbitMQ管理后台查看Exchange和Queue。

delay-exchange

可以看到,除了默认的交换机,SpringBoot已经帮我们创建好了延迟交换机order-delay-exchange,并且此时messages delayed为0,因为我们还未向交换机投递消息。

可以继续查看交换机的路由类型与绑定的队列

ExchangeDetail

在查看队列,为普通的队列

回到代码中,定义消息生产者

Orderender

在消息投递之前为每条消息都设置了延迟时长setDelay()。 调用消费者的代码在上面OrderController中,下定之后,订单数据落库,并且向MQ中投递延迟消息。可以回头看看。

消费者-监听过期的订单信息,并且将DB中相应的订单设置为已过期。

OrderConsumer

为了方便查看到延迟投递的效果,我在消息投递和接收处都打印了日志,测试时可以看到消息投递和消息的时间间隔。

# 测试

把订单过期时长设置为10S

app:   rabbitmq:     delay:       order: 10S

下定

log

DB100

可以看到,打印出了投递日志,订单主键为666ae86aabe2a1b3120b34bb5f447bbe的订单在2020-04-14 22:22:04.307进行了投递,此时数据库中该订单的状态为100,待支付。

此时查看Exchange详情可以发现,messages delayed:1,即目前有一条消息处于延迟状态。

ExchangeDetail

等待10S后。

log

可以看到OrderConsumer在10S后2020-04-14 22:22:14.320接收到了主键为666ae86aabe2a1b3120b34bb5f447bbe的订单消息。距离投递时间2020-04-14 22:22:04.307为10S。此时查看DB中订单状态:

DB

订单状态为200已过期,且过期时间为2020-04-14 22:22:14

达到了订单在我们指定的时间后过期。

再测试几条一分钟的场景

app:   rabbitmq:     delay:       order: 1M

Exchange

log

消息都在延迟1分钟后投递到了队列-消费者。

# 严重风险提示:

在实际业务使用中,如果消费者的消费能力比较低下,会存在已经过期的消息阻塞积压在队列,无法在指定的时间内过期,导致业务出现异常。实际上,按照我们业务意图,队里Queue里是不应该有大量消息存在的,因为投递到过期队列的消息已经是过期了的,应该立即被消费掉。

进行测试: 为了降低消费者的消费能力,进行如下处理:

设置消费者的最大并发数为1,并进行手动签收。

    listener:       simple:         # 每个队列启动的消费者数量         concurrency: 1         # 每个队列最大的消费者数量         max-concurrency: 1         acknowledge-mode: manual app:   rabbitmq:     delay:       # 订单过期时间为1分钟       order: 1M

消费者在处理消息时休眠5S

向MQ投递两条消息,预期两条消息都在1分钟后正常过期。

结果(去除了无关信息):

result

2020-04-15 20:18:05.269 OrderSender : 订单[d6fd965b11f8db0fafb762d305db83b0]投递到MQ 2020-04-15 20:18:05.765 OrderSender : 订单[77ceb7f1bfbbcaf627224ac75e96b0e5]投递到MQ 2020-04-15 20:19:05.279 OrderConsumer : 消费者接收到延迟订单[d6fd965b11f8db0fafb762d305db83b0] 2020-04-15 20:19:15.316 OrderConsumer : 订单业务处理结束.....进行消息ack签收 2020-04-15 20:19:15.318 OrderConsumer : 消费者接收到延迟订单[77ceb7f1bfbbcaf627224ac75e96b0e5] 2020-04-15 20:19:25.330 OrderConsumer : 订单业务处理结束.....进行消息ack签收

第一个订单d6fd965b11f8db0fafb762d305db83b0投递时间为2020-04-15 20:18:05.269。1分钟后2020-04-15 20:19:05.279接收到了通知,并且处理了10S后进行了签收ack。 第二个订单77ceb7f1bfbbcaf627224ac75e96b0e5投递时间为2020-04-15 20:18:05.765。1分钟过后并没有收到通知,而是在第一个订单处理完毕之后,2020-04-15 20:19:15.318才收到了通知,比预期的时间长了10秒,实际延迟时间为1分钟+10秒。出现了业务异常。

导致这个问题的原因就是消费者无法及时消费消息并更新订单状态。所以我们在进行开发时,需要考虑实际的数据量大小,消费者消费能力。及时关注队列消息积压情况,灵活调整消费者并发数量,优化消费者代码,提高消费者消费能力。保证代码的执行结果符合我们的预期。

本文涉及的源代码:

https://github.com/FutaoSmile/springboot-learn-integration/releases/tag/v_rabbitmq_delay_queue



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3