RabbitMQ消息队列处理库存解锁及关闭订单问题

您所在的位置:网站首页 mq事务消息怎么回滚 RabbitMQ消息队列处理库存解锁及关闭订单问题

RabbitMQ消息队列处理库存解锁及关闭订单问题

2024-07-10 02:42| 来源: 网络整理| 查看: 265

文章目录 一、RabbitMQ延时队列消息的TTL死信 二、实战延时关单规范设计 三、消息队列处理库存解锁及关单1、流程分析2、库存微服务2.1 解锁库存配置2.2 解锁库存流程2.3 业务代码2.4 调试 四、RMQ 延时队列处理关单及库存解锁整合1、流程分析2、订单关单3、订单释放和库存释放进行绑定 五、消息丢失、重复、积压等解决方案1、消息丢失2、消息重复3、消息积压

一、RabbitMQ延时队列

RabbitMQ延时队列实现定时任务。 场景: 比如未付款订单,超过一定时间后,系统自动取消订单并释放占有的库存。 常用解决方案: spring的schedule定时任务轮训数据库 缺点: 消耗系统内存,增加了数据库的压力、存在较大的时间误差 解决: Rabbit的消息 TTL 和死信Exchange结合。

消息的TTL

消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。。 RabbitMQ 可以对队列和消息分别设置TTL。

对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果。

注意:延时消息放入到队列中,没有被任何消费者监听,如果监听就拿到了,也就被消费了,队列里边的消息只要一过设置的过期时间,就成了死信队列,服务器就会丢弃。 那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

Map args = new HashMap(); args.put(“x-message-ttl”, 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。 另一种方式便是针对每条消息设置TTL,代码如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration(“6000”); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, “msg body”.getBytes());

这样这条消息的过期时间也被设置成了6s。 但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列没有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

死信

死信:Dead Letter Exchange(DLX) 一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。

一个消息被Consumer拒收了,并且reject方法的参数里 requeue 是false。也就是说不会被放在队列里,被其他消费者使用。(basic.reject/basic.nack) requeue=false上面的消息的TTL到了,消息过期了。队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。

Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他exchange一样。只是在某一个设置Dead Letter Exchange 的队列中有信息过期了,会自动触发消息的转发,发送到 Dead Letter Exhange中去。 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。 在这里插入图片描述

二、实战 延时关单

场景:用户下单,过了30分钟没有支付,系统会默认关闭该订单,以前可以用定时任务做,现在使用延时队列。 在这里插入图片描述

规范设计

设计建议规范(基于事件模型的交换机设计): 1、交换机命名:业务+exchange;交换机为Topic 2、路由键:事件.需要感知的业务(可以不写) 3、队列命名:事件+想要监听服务名+queue 4、绑定关系:事件.感知的业务(#) 整体业务设计: 在这里插入图片描述 按照上边的规范设计,对关单业务进行升级设计: 在这里插入图片描述 上图说明:交换机 order-event-exchange 绑定了一个延时队列order.delay.queue,路由key是 order.create.order, 当创建了一个订单时,会发消息到该延时队列,等到TTL过期,变为死信,会自动触发消息的转发,发送到 Dead Letter Exhange(order-event-exchange) 中去,注意死信路由是 order.release.order,然后exchange根据路由key order.release.order转发消息到 order.release.order.queue队列,客户端监听该队列获取消息。 根据上图的业务设计分析,需要创建两个队列,一个交换机,和两个绑定。 gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config; import com.atguigu.gulimall.order.entity.OrderEntity; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; /** * @author: kaiyi * @create: 2020-09-16 13:53 */ @Configuration public class MyMQConfig { /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */ /** * 客户端监听队列(测试) * @param orderEntity * @param channel * @param message * @throws IOException */ @RabbitListener(queues = "order.release.order.queue") public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } /** * 死信队列 * * @return */ @Bean public Queue orderDelayQueue(){ /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Map arguments) 属性 */ HashMap arguments = new HashMap(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 普通队列 * * @return */ @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * TopicExchange * * @return */ @Bean public Exchange orderEventExchange(){ /* * String name, * boolean durable, * boolean autoDelete, * Map arguments * */ return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", // 路由key一般为事件名 null); } @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } }

然后在控制器创建测试消息: gulimall-order/xxx/order/web/HelloController.java

** * @author: kaiyi * @create: 2020-09-12 18:09 */ @Controller public class HelloController { @Autowired private RabbitTemplate rabbitTemplate; @ResponseBody @GetMapping(value = "/test/createOrder") public String createOrderTest() { //订单下单成功 OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); orderEntity.setModifyTime(new Date()); //给MQ发送消息 rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity); return "ok"; } }

然后访问该路径 http://order.gulimall.com/test/createOrder, 发送消息,然后去RMQ管理界面可以看到创建的消息已经成功了。 交换机: 在这里插入图片描述 交换机绑定的队列(路由key): 在这里插入图片描述 队列: 在这里插入图片描述 可以看到第一个队列是死信队列,第二个事普通队列 收到的消息为实体对象json: 在这里插入图片描述 控制器输出的监控信息:

收到过期的订单信息:准备关闭订单321c3329-d57a-4613-a4ff-331066d4105a 收到过期的订单信息:准备关闭订单44fcf65f-1e7a-40c6-8336-a6c60362920b

三、消息队列处理库存解锁及关单 1、流程分析

在这里插入图片描述 在这里插入图片描述

2、库存微服务 2.1 解锁库存配置

1、库存微服务gulimall-ware 引入高级消息队列amqp依赖: gulimall-ware/pom.xml

org.springframework.boot spring-boot-starter-amqp

2、添加RMQ配置 gulimall-ware/src/main/resources/application.properties

# ===== RabbitMQ配置 ====== spring.rabbitmq.host=192.168.10.10 spring.rabbitmq.port=5672 # 虚拟主机配置 spring.rabbitmq.virtual-host=/ # 开启发送端消息抵达Broker确认 spring.rabbitmq.publisher-confirms=true # 开启发送端消息抵达Queue确认 spring.rabbitmq.publisher-returns=true # 只要消息抵达Queue,就会异步发送优先回调returnfirm spring.rabbitmq.template.mandatory=true # 手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、创建RMQ配置文件 gulimall-ware/xxx/ware/config/MyRabbitMQConfig.java

package com.atguigu.gulimall.ware.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; /** * RMQ配置 * * @author: kaiyi * @createTime: 2020-09-15 16:40 **/ @Configuration public class MyRabbitMQConfig { /** * 使用JSON序列化机制,进行消息转换 * @return */ @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } // @RabbitListener(queues = "stock.release.stock.queue") // public void handle(Message message) { // // } /** * 库存服务默认的交换机 * @return */ @Bean public Exchange stockEventExchange() { //String name, boolean durable, boolean autoDelete, Map arguments TopicExchange topicExchange = new TopicExchange("stock-event-exchange", true, false); return topicExchange; } /** * 普通队列 * @return */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments Queue queue = new Queue("stock.release.stock.queue", true, false, false); return queue; } /** * 延迟队列 * @return */ @Bean public Queue stockDelay() { HashMap arguments = new HashMap(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 2分钟 arguments.put("x-message-ttl", 120000); Queue queue = new Queue("stock.delay.queue", true, false, false,arguments); return queue; } /** * 交换机与普通队列绑定 * @return */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map arguments Binding binding = new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); return binding; } /** * 交换机与延迟队列绑定 * @return */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); } } 2.2 解锁库存流程

解锁库存流程:

在这里插入图片描述 可以看到,在锁定库存时,我们增加了库存工作单,用来记录库存锁定的明细记录,如果库存锁定异常,则会回滚,该表不会有数据记录,如果锁定成功,则会有具体的锁定记录,锁定成功后会发送消息到延时队列,过段时间会根据订单创建的状态(订单取消或订单未创建成功)来解锁库存。 解锁库存具体步骤: 在这里插入图片描述

2.3 业务代码

锁库存 锁库存,并发送消息到延时队列,方法 orderLockStock(WareSkuLockVo vo) gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

package com.atguigu.gulimall.ware.service.impl; import com.alibaba.fastjson.TypeReference; import com.atguigu.common.exception.NoStockException; import com.atguigu.common.to.mq.StockDetailTo; import com.atguigu.common.to.mq.StockLockedTo; import com.atguigu.common.utils.R; import com.atguigu.gulimall.ware.entity.WareOrderTaskDetailEntity; import com.atguigu.gulimall.ware.entity.WareOrderTaskEntity; import com.atguigu.gulimall.ware.feign.OrderFeignService; import com.atguigu.gulimall.ware.feign.ProductFeignService; import com.atguigu.gulimall.ware.service.WareOrderTaskDetailService; import com.atguigu.gulimall.ware.service.WareOrderTaskService; import org.springframework.transaction.annotation.Transactional; @Service("wareSkuService") public class WareSkuServiceImpl extends ServiceImpl implements WareSkuService { @Autowired WareSkuDao wareSkuDao; @Autowired ProductFeignService productFeignService; @Autowired private WareOrderTaskService wareOrderTaskService; @Autowired private WareOrderTaskDetailService wareOrderTaskDetailService; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private OrderFeignService orderFeignService; /** * 为某个订单锁定库存 * @param vo * @return */ @Transactional(rollbackFor = Exception.class) @Override public boolean orderLockStock(WareSkuLockVo vo) { /** * 保存库存工作单详情信息 * 追溯 */ WareOrderTaskEntity wareOrderTaskEntity = new WareOrderTaskEntity(); wareOrderTaskEntity.setOrderSn(vo.getOrderSn()); wareOrderTaskEntity.setCreateTime(new Date()); wareOrderTaskService.save(wareOrderTaskEntity); //1、按照下单的收货地址,找到一个就近仓库,锁定库存 //2、找到每个商品在哪个仓库都有库存 List locks = vo.getLocks(); List collect = locks.stream().map((item) -> { SkuWareHasStock stock = new SkuWareHasStock(); Long skuId = item.getSkuId(); stock.setSkuId(skuId); stock.setNum(item.getCount()); //查询这个商品在哪个仓库有库存 List wareIdList = wareSkuDao.listWareIdHasSkuStock(skuId); stock.setWareId(wareIdList); return stock; }).collect(Collectors.toList()); //2、锁定库存 for (SkuWareHasStock hasStock : collect) { boolean skuStocked = false; Long skuId = hasStock.getSkuId(); List wareIds = hasStock.getWareId(); if (org.springframework.util.StringUtils.isEmpty(wareIds)) { //没有任何仓库有这个商品的库存,抛出异常,前边已经锁定的库存也一起会回滚 throw new NoStockException(skuId); } //1、如果每一个商品都锁定成功,将当前商品锁定了几件的工作单记录发给MQ //2、锁定失败。前面保存的工作单信息都回滚了。发送出去的消息,即使要解锁库存,由于在数据库查不到指定的id,所有就不用解锁 for (Long wareId : wareIds) { //锁定成功就返回1,失败就返回0 Long count = wareSkuDao.lockSkuStock(skuId,wareId,hasStock.getNum()); if (count == 1) { skuStocked = true; WareOrderTaskDetailEntity taskDetailEntity = WareOrderTaskDetailEntity.builder() .skuId(skuId) .skuName("") .skuNum(hasStock.getNum()) .taskId(wareOrderTaskEntity.getId()) .wareId(wareId) .lockStatus(1) .build(); wareOrderTaskDetailService.save(taskDetailEntity); //TODO 告诉MQ库存锁定成功 StockLockedTo lockedTo = new StockLockedTo(); lockedTo.setId(wareOrderTaskEntity.getId()); StockDetailTo detailTo = new StockDetailTo(); BeanUtils.copyProperties(taskDetailEntity,detailTo); lockedTo.setDetailTo(detailTo); rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo); break; } else { //当前仓库锁失败,重试下一个仓库 } } if (skuStocked == false) { //当前商品所有仓库都没有锁住 throw new NoStockException(skuId); } } //3、肯定全部都是锁定成功的 return true; } @Override public void unlockStock(StockLockedTo to) { //库存工作单的id StockDetailTo detail = to.getDetailTo(); Long detailId = detail.getId(); /** * 解锁 * 1、查询数据库关于这个订单锁定库存信息 * 有:证明库存锁定成功了 * 解锁:订单状况 * 1、没有这个订单,必须解锁库存 * 2、有这个订单,不一定解锁库存 * 订单状态:已取消:解锁库存 * 已支付:不能解锁库存 */ WareOrderTaskDetailEntity taskDetailInfo = wareOrderTaskDetailService.getById(detailId); if (taskDetailInfo != null) { //查出wms_ware_order_task工作单的信息 Long id = to.getId(); WareOrderTaskEntity orderTaskInfo = wareOrderTaskService.getById(id); //获取订单号查询订单状态 String orderSn = orderTaskInfo.getOrderSn(); //远程查询订单信息 R orderData = orderFeignService.getOrderStatus(orderSn); if (orderData.getCode() == 0) { //订单数据返回成功 OrderVo orderInfo = orderData.getData("data", new TypeReference() {}); //判断订单状态是否已取消或者支付或者订单不存在 if (orderInfo == null || orderInfo.getStatus() == 4) { //订单已被取消,才能解锁库存 if (taskDetailInfo.getLockStatus() == 1) { //当前库存工作单详情状态1,已锁定,但是未解锁才可以解锁 unLockStock(detail.getSkuId(),detail.getWareId(),detail.getSkuNum(),detailId); } } } else { //消息拒绝以后重新放在队列里面,让别人继续消费解锁 //远程调用服务失败 throw new RuntimeException("远程调用服务失败"); } } else { //无需解锁 } } /** * 解锁库存的方法 * @param skuId * @param wareId * @param num * @param taskDetailId */ public void unLockStock(Long skuId,Long wareId,Integer num,Long taskDetailId) { //库存解锁 wareSkuDao.unLockStock(skuId,wareId,num); //更新工作单的状态 WareOrderTaskDetailEntity taskDetailEntity = new WareOrderTaskDetailEntity(); taskDetailEntity.setId(taskDetailId); taskDetailEntity.setLockStatus(2); //变为已解锁 wareOrderTaskDetailService.updateById(taskDetailEntity); } @Data class SkuWareHasStock { private Long skuId; private Integer num; private List wareId; } }

监听队列: gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener; import com.atguigu.common.to.mq.StockLockedTo; import com.atguigu.gulimall.ware.service.WareSkuService; import com.rabbitmq.client.Channel; /** * 库存解锁监听 * * @desc * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。 * * @author: kaiyi * @create: 2020-09-16 19:01 */ @Slf4j @RabbitListener(queues = "stock.release.stock.queue") @Service public class StockReleaseListener { @Autowired private WareSkuService wareSkuService; /** * 1、库存自动解锁 * 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁 * * 2、订单失败 * 库存锁定失败 * * 只要解锁库存的消息失败,一定要告诉服务解锁失败 */ @RabbitHandler public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException { log.info("******收到解锁库存的信息******"); try { //当前消息是否被第二次及以后(重新)派发过来了 // Boolean redelivered = message.getMessageProperties().getRedelivered(); //解锁库存 wareSkuService.unlockStock(to); // 手动删除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 解锁失败 将消息重新放回队列,让别人消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } }

上边就是创建订单后锁库存,发消息到延时队列,监听队列,创建的订单出现异常是否来解锁库存,手动确认消息的核心代码逻辑。

2.4 调试

订单结算页,购买了一件商品。 在这里插入图片描述 在提交订单时,远程锁库存成功后模拟代码异常: 在这里插入图片描述 提交订单,由于异常会回滚订单并且回退到结算页,连续提交三次,我们可以看到延时队列里边有三条信息。 在这里插入图片描述 库存表wms_ware_sku,原来锁定了 3 件库存,现在库存锁定为6,因为库存是远程锁定的,所以,主程序事务回滚对远程的不起作用,不过在锁定库存成功时发库存锁定成功的消息,后边通过消息会检查是否释放库存。 在这里插入图片描述 库存工作单主表: 在这里插入图片描述 库存工作单明细表,我们可以看到新增的3条记录,明细状态lock_status(1-已锁定 2-已解锁 3-扣减) 在这里插入图片描述 订单服务订单表: 在这里插入图片描述 然后等到消息过期进入死信路由,TTL后客户端监听消息判断是否释放库存,消息在判断的时候先根据生成的订单号远程查询gulimall-order是否存在对应的订单,如果不存在,则直接释放锁定的库存,因为在生成订单的时候抛出异常生成的订单回滚了,所以 oms_order 表不存在订单,这时监听的消息拿到延时消息后,做完判断后会触发解锁库存的动作。 过了几分钟后,我们可以看到消息已经被消费了,并且锁定的库存也释放了,变回原来的 3 件。 消息队列: 在这里插入图片描述 库存表: 在这里插入图片描述 可以看到,RMQ在解决分布式事务一致性问题上非常强大,不仅实现了解耦,而且还保证了可靠消息+最终一致性。

四、RMQ 延时队列处理关单及库存解锁整合 1、流程分析

在这里插入图片描述 步骤:

1、订单创建成功,发送消息给MQ2、订单服务订单关单监听器(死信之后判断是否关单)3、订单关单成功后发消息给MQ(订单释放直接和库存释放进行绑定)4、库存服务是释放库存监听器监听是否解锁库存队列(stock.release.stock.queue)5、库存解锁处理逻辑

这里出现了两个交换机绑定同一个队列的情况,即订单的交换机和库存的队列绑定在一起了。

2、订单关单

1、订单释放直接和库存释放进行绑定 gulimall-order/xxx/order/config/MyMQConfig.java

package com.atguigu.gulimall.order.config; import com.atguigu.gulimall.order.entity.OrderEntity; import com.rabbitmq.client.AMQP; /** * @author: kaiyi * @create: 2020-09-16 13:53 */ @Configuration public class MyMQConfig { /* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */ /** * 客户端监听队列(测试) * @param orderEntity * @param channel * @param message * @throws IOException */ /* @RabbitListener(queues = "order.release.order.queue") public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn()); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } */ /** * 死信队列 * * @return */ @Bean public Queue orderDelayQueue(){ /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Map arguments) 属性 */ HashMap arguments = new HashMap(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 信死了交给哪个交换机 arguments.put("x-dead-letter-routing-key", "order.release.order"); // 信死了交给哪个路由key arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; } /** * 普通队列 * * @return */ @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("order.release.order.queue", true, false, false); return queue; } /** * TopicExchange * * @return */ @Bean public Exchange orderEventExchange(){ /* * String name, * boolean durable, * boolean autoDelete, * Map arguments * */ return new TopicExchange("order-event-exchange", true, false); } @Bean public Binding orderCreateBinding() { /* * String destination, 目的地(队列名或者交换机名字) * DestinationType destinationType, 目的地类型(Queue、Exhcange) * String exchange, * String routingKey, * Map arguments * */ return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", // 路由key一般为事件名 null); } @Bean public Binding orderReleaseBinding() { return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null); } /** * 订单释放直接和库存释放进行绑定 * @return */ @Bean public Binding orderReleaseOtherBinding() { return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); } }

2、提交订单增加订单创建成功,发送消息给MQ gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/** * 提交订单 * @param vo * @return */ // @Transactional(isolation = Isolation.READ_COMMITTED) 设置事务的隔离级别 // @Transactional(propagation = Propagation.REQUIRED) 设置事务的传播级别 @Transactional(rollbackFor = Exception.class) // @GlobalTransactional(rollbackFor = Exception.class) @Override public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) { confirmVoThreadLocal.set(vo); SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo(); //去创建、下订单、验令牌、验价格、锁定库存... //获取当前用户登录的信息 MemberResponseVo memberResponseVo = LoginUserInterceptor.loginUser.get(); responseVo.setCode(0); //1、验证令牌是否合法【令牌的对比和删除必须保证原子性】 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; String orderToken = vo.getOrderToken(); //通过lua脚本原子验证令牌和删除令牌 Long result = redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), orderToken); if (result == 0L) { //令牌验证失败 responseVo.setCode(1); return responseVo; } else { //令牌验证成功 //1、创建订单、订单项等信息 OrderCreateTo order = createOrder(); //2、验证价格 BigDecimal payAmount = order.getOrder().getPayAmount(); BigDecimal payPrice = vo.getPayPrice(); if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) { //金额对比 //TODO 3、保存订单 saveOrder(order); //4、库存锁定,只要有异常,回滚订单数据 //订单号、所有订单项信息(skuId,skuNum,skuName) WareSkuLockVo lockVo = new WareSkuLockVo(); lockVo.setOrderSn(order.getOrder().getOrderSn()); //获取出要锁定的商品数据信息 List orderItemVos = order.getOrderItems().stream().map((item) -> { OrderItemVo orderItemVo = new OrderItemVo(); orderItemVo.setSkuId(item.getSkuId()); orderItemVo.setCount(item.getSkuQuantity()); orderItemVo.setTitle(item.getSkuName()); return orderItemVo; }).collect(Collectors.toList()); lockVo.setLocks(orderItemVos); //TODO 调用远程锁定库存的方法 //出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决方案:seata) //为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务 R r = wmsFeignService.orderLockStock(lockVo); if (r.getCode() == 0) { //锁定成功 responseVo.setOrder(order.getOrder()); // int i = 10/0; // 抛出异常,测试远程回滚 //TODO 订单创建成功,发送消息给MQ rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder()); //删除购物车里的数据 redisTemplate.delete(CartConstant.CART_PREFIX + memberResponseVo.getId()); return responseVo; } else { //锁定失败 String msg = (String) r.get("msg"); throw new NoStockException(msg); // responseVo.setCode(3); // return responseVo; } } else { responseVo.setCode(2); return responseVo; } } }

3、关单监听 gulimall-order/xxx/order/listener/OrderCloseListener.java

package com.atguigu.gulimall.order.listener; import com.atguigu.gulimall.order.entity.OrderEntity; import com.atguigu.gulimall.order.service.OrderService; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; /** * 关单监听 * * @author: kaiyi * @create: 2020-09-17 11:01 */ @RabbitListener(queues = "order.release.order.queue") @Service public class OrderCloseListener { @Autowired private OrderService orderService; @RabbitHandler public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn()); try { orderService.closeOrder(orderEntity); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } }

4、关单成功给库存服务发送MQ消息 gulimall-order/xxx/order/service/impl/OrderServiceImpl.java

/** * 关闭订单 * @param orderEntity */ @Override public void closeOrder(OrderEntity orderEntity) { //关闭订单之前先查询一下数据库,判断此订单状态是否已支付 OrderEntity orderInfo = this.getOne(new QueryWrapper(). eq("order_sn",orderEntity.getOrderSn())); if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { //代付款状态进行关单 OrderEntity orderUpdate = new OrderEntity(); orderUpdate.setId(orderInfo.getId()); orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(orderUpdate); // 发送消息给MQ OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderInfo, orderTo); try { //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); } catch (Exception e) { //TODO 定期扫描数据库,重新发送失败的消息 } } } 3、订单释放和库存释放进行绑定

1、库存服务-库存释放监听器增加订单关单库存释放处理方法 gulimall-ware/xxx/ware/listener/StockReleaseListener.java

package com.atguigu.gulimall.ware.listener; import com.atguigu.common.to.OrderTo; import com.atguigu.common.to.mq.StockLockedTo; /** * 库存解锁监听 * * @desc * 库存锁定成功发送消息到延时队列 stock.locked(路由key),超时TTL,消息进入私信路由,然后转发到解锁库存的队列。 * * @author: kaiyi * @create: 2020-09-16 19:01 */ @Slf4j @RabbitListener(queues = "stock.release.stock.queue") @Service public class StockReleaseListener { @Autowired private WareSkuService wareSkuService; /** * 1、库存自动解锁 * 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁 * * 2、订单失败 * 库存锁定失败 * * 只要解锁库存的消息失败,一定要告诉服务解锁失败 */ @RabbitHandler public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException { log.info("******收到解锁库存的信息******"); try { //当前消息是否被第二次及以后(重新)派发过来了 // Boolean redelivered = message.getMessageProperties().getRedelivered(); //解锁库存 wareSkuService.unlockStock(to); // 手动删除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 解锁失败 将消息重新放回队列,让别人消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } /** * 订单关单库存释放 * * @param orderTo * @param message * @param channel * @throws IOException */ @RabbitHandler public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException { log.info("******收到订单关闭,准备解锁库存的信息******"); try { wareSkuService.unlockStock(orderTo); // 手动删除消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { // 解锁失败 将消息重新放回队列,让别人消费 channel.basicReject(message.getMessageProperties().getDeliveryTag(),true); } } }

这个监听器既可以处理库存解锁又可以处理订单关单的处理业务,根据参数来决定具体调用哪一个,这是一个重载。

2、具体解锁库存实现 gulimall-ware/xxx/ware/service/impl/WareSkuServiceImpl.java

/** * 防止订单服务卡顿,导致订单状态消息一直改不了,库存优先到期,查订单状态新建,什么都不处理 * 导致卡顿的订单,永远都不能解锁库存 * @param orderTo */ @Transactional(rollbackFor = Exception.class) @Override public void unlockStock(OrderTo orderTo) { String orderSn = orderTo.getOrderSn(); //查一下最新的库存解锁状态,防止重复解锁库存 WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getOrderTaskByOrderSn(orderSn); //按照工作单的id找到所有 没有解锁的库存,进行解锁 Long id = orderTaskEntity.getId(); List list = wareOrderTaskDetailService.list(new QueryWrapper() .eq("task_id", id).eq("lock_status", 1)); for (WareOrderTaskDetailEntity taskDetailEntity : list) { unLockStock(taskDetailEntity.getSkuId(), taskDetailEntity.getWareId(), taskDetailEntity.getSkuNum(), taskDetailEntity.getId()); } } 五、消息丢失、重复、积压等解决方案

高并发场景的分布式事务,我们采用柔性事务+可靠消息+最终一致性方案(异步确保型),可靠性是最重要的,那么如何保证消息的可靠性呢?

1、消息丢失

1、消息发送出去,由于网络问题没有抵达服务器

做好容错方法(try-catch),发送消息可能会网络失败,失败后要有容错机制,可记录到数据库,采用定期扫描重发的方式。做好日志记录,每个消息状态是否都被服务器收到都应该记录做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发

代码示例:

/** * 关闭订单 * @param orderEntity */ @Override public void closeOrder(OrderEntity orderEntity) { //关闭订单之前先查询一下数据库,判断此订单状态是否已支付 OrderEntity orderInfo = this.getOne(new QueryWrapper(). eq("order_sn",orderEntity.getOrderSn())); if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) { //代付款状态进行关单 OrderEntity orderUpdate = new OrderEntity(); orderUpdate.setId(orderInfo.getId()); orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(orderUpdate); // 发送消息给MQ OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderInfo, orderTo); try { //TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); } catch (Exception e) { //TODO 定期扫描数据库,重新发送失败的消息 // while() 重试次数 } } }

创建消息日志记录表: 在这里插入图片描述 2、消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。

publisher 也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

生产者消息确认回调应该增加日志记录,确认回调成功后修改记录日志的状态: gulimall-order/xxx/order/config/MyRabbitConfig.java

/** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback *

* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) */ // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { /** * 1、做好消息确认机制(publisher,consumer【手动ack】】) * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍 */ // 服务器收到生产者发送的消息了 // 修改消息的状态 System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); /** * 2、只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); }); }

3、自动ACK的状态下。消费者收到消息,但没来得及消费然后宕机。

一定开启手动ACK,消费成功才移除,失败或者还没来得及处理就 noAck并重新入队。

防止消息丢失记住这两条:

1、做好消息确认机制(publisher,consumer【手动ack】】) 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍

2、消息重复

出现重复的几种情况

1、消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功。 Broker的消息重新由 unack 变为ready,并发送给其他消费者 2、消息消费失败,由于重试机制,自动又将消息发送出去。3、成功消费,ack时宕机,消息又unack变为ready,Broker又重新发送

解决方案

消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标识。使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用再处理。rabbitMQ的每一个消息都有 redilivered字段,可以获取是否被重新投递过来的,而不是第一次被投递过来的。 3、消息积压 消费者宕机消费者消费能力不足发送者发送流量太大 上线更多的消费者,进行正常的消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3