RabbitMQ

您所在的位置:网站首页 ack确认机制 RabbitMQ

RabbitMQ

2023-06-30 02:39| 来源: 网络整理| 查看: 265

RabbitMQ的使用场景 1. 异步处理

应用场景一:缩短调用时间

同步处理:1、注册信息写入数据库;2、发送注册邮件;3、发送注册短信 150S

异步处理【需要等待返回】:1、注册信息写入数据库; 异步发送: 2、发送注册邮件 + 发送注册短信 100S

消息队列【不需要等待返回】:1、注册信息写入数据库; 写入队列:异步读取 2、发送注册邮件;3、发送注册短信 50S【因为 发送邮件+发送短信 不需要等待返回】

2. 应用解耦

应用场景二:应用解耦 例如订单系统 调用库存系统,如果库存系统升级,会导致订单系统要修改源代码 订单系统往消息队列写入一条消息【不关心库存系统的迭代】,由库存主动取出消息【实时订阅消息】

3. 流量控制【削峰】

应用场景三:流量控制【削峰】 秒杀:百万请求 存储到消息队列,由秒杀业务订阅 队列一条条处理,不会导致流量过大使服务器宕机

SpringBoot整合RabbitMQ

引入依赖

org.springframework.boot spring-boot-starter-amqp

RabbitMQ相关配置

spring: rabbitmq: host: 192.168.157.128 port: 5672 virtual-host: /

消息确认机制

为什么不使用事务消息:保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制 publisher confirmCallback 确认模式(消息是否到达Broker消息代理) publisher returnCallback 未投递到 queue 退回模式(只要消息没有投递给指定的队列,就触发这个失败回调) consumer ack机制 (ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。

 1.  ConfirmCallback (确认模式:发送端确认) #开启发送端确认 spring.rabbitmq.publisher-confirms=true @Configuration public class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback * * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) * */ @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ //设置确认回调 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]"); }); }

 2. returnCallback(回退模式:发送端确认) # 开启发送端消息抵达队列确认 spring.rabbitmq.publisher-returns=true #只要抵达队列, 以异步方式优先回调我们这个returnConfirm spring.rabbitmq.template.mandatory=true /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 */ rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); });

错误消息

Fail Message[(Body:’{“id”:1,“name”:“消息—0”,“sort”:null,“status”:null,“createTime”:1639576478075}’ MessageProperties [headers={TypeId=site.zhourui.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]>replyCode[312]>replyText[NO_ROUTE]>exchange[test_exchange]>routingKey[test.binding222] confirm…correlationData[null]>ack:[true]>cause:[null]  

3. ack机制(消费端确认)

消费者获取到消息,成功处理,可以回复Ack给Broker

basic.ack用于肯定确认;broker将移除此消息basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量basic.reject用于否定确认;同上,但不能批量

默认自动ack,消息被消费者收到,就会从broker的queue中移除 queue无消费者,消息依然会被存储,直到消费者消费 消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式

        消息处理成功,ack(),接受下一个消息,此消息broker就会移除         消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack         消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

#手动ack消息 spring.rabbitmq.listener.simple.acknowledge-mode= manual

发送五个消息测试,就算客户端已经拿到了消息,但是没有确认,队列中的消息仍然不能移除,只不过状态由ready变为unacked

此时关闭服务服务,消息的状态由unacked变为ready,下次客户端服务启动又会接收到消息ready变为unacked 

public void basicNack(long deliveryTag, boolean multiple) //channel内按顺序自增 //是否批量确认 相当于channel信道中消息的唯一id public void basicNack(long deliveryTag,boolean multiple,boolean requeue) //确认后是否重新入队 false丢弃 channel.basicNack(deliveryTag,false,true); //手动ack确认拒绝消息 MessageProperties messageProperties = msg.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag();

RabbitMQ延时队列(实现定时任务) 为什么不使用定时任务

定时任务时效性问题

 场景:比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。

常用解决方案:spring的 schedule 定时任务轮询数据库

缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差

解决:rabbitmq的消息TTL和死信Exchange结合

导致的死信的几种原因:

1. 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false 2. 上面的消息的TTL到了,消息过期了。 3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上

如何实现延时队列 

控制消息在一段时间后变成死信,控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列

延时队列的实现

也可以设置每个消息各自的过期时间

 只需要一个交换机 设置不同的路由键 即可实现

订单服务&库存服务

 订单确认页流程

订单生成页需要完成

判断用户登录信息 远程查询所有的收获地址列表 远程查询购物车所有选中的购物项 远程查询商品库存信息 查询用户积分 价格数据自动计算 防重令牌(防止表单重复提交

feign远程调用丢失请求头

原因:feign发送请求时构造的RequestTemplate没有请求头(该请求头为空),请求参数等信息【cookie没了】

导致在cart服务中,拦截器拦截获取session中的登录信息,获取不到userId【没有cookie】

解决:同步新、老请求(老请求就是/toTrade请求,带有Cookie数据)的cookie

原理: feign在远程调用之前要构造请求,调用很多的拦截器(DEBUG,查看到会调用 拦截器)  

Feign异步情况丢失上下文问题

 

导致拦截器中 空指针异常 1、先在主线程的ThreadLocal中获取 请求头数据      

RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();

 2、再在新线程给ThreadLocal设置 请求头数据【否则获取不到数据,不是同一个线程】

//每一个线程都来共享之前的请求数据【解决异步ThreadLocal 无法共享数据】 RequestContextHolder.setRequestAttributes(requestAttributes);  创建防重令牌

        来实现接口的幂等性

提交订单

下单:去创建订单,验令牌,验价格,锁库存

提交订单成功,则携带返回数据转发至支付页面

提交订单失败,则携带错误信息重定向至确认页

验令牌

        【令牌的对比和删除必须保证原子性】  使用 Redis + LUA 脚本

验价格

        验价格成功之后,保存订单

锁库存

        调用远程锁定库存的方法 可能存在问题  库存成功了 但是网络原因超时了,订单回滚 库存不回滚

订单提交的问题 (本地事务在分布式情况下出现的问题)

分布式情况下,可能出现一些服务事务不一致的情况

远程服务假失败远程服务执行完成后,下面其他方法出现异常

库存扣减成功但是订单业务执行出错,订单业务可以回滚但远程调用的库存服务是办法回滚的 

可以使用Seata来实现分布式事务

但是为了保证高并发,不推荐使用seata,因为是加锁,并行化,提升不了效率,可以发消息给库存服务

推荐使用 消息队列

最终一致性库存解锁逻辑:基于消息队列的分布式事务+分布式表【库存自动解锁】

为库存模块创建业务交换机,队列,绑定(整合Rabbitmq)

 锁定库存之后

        1. 将每一个库存工作详情单 发送一个消息 告诉MQ锁定成功

rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", lockedTo);

        2. 之后这个消息根据发送的路由键 进入 stock.delay.queue 这个死信队列

/** * 延迟队列 * @return */ @Bean public Queue stockDelayQueue() { HashMap arguments = new HashMap(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 2分钟 arguments.put("x-message-ttl", 120000); Queue queue = new Queue("stock.delay.queue", true, false, false,arguments); return queue; }

        3. 等待指定时间之后,该消息过期,成为死信消息,通过路由键stock.release 进入交换机stock-event-exchange。 再根据路由进入普通队列stock.release.stock.queue

        4. 监听死信队列 stock.release.stock.queue。 接收消息

@Service @RabbitListener(queues = "stock.release.stock.queue") public class StockReleaseListener { @Autowired WareSkuService wareSkuService; @RabbitHandler public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException{ System.out.println("收到解锁库存的消息...."); try { wareSkuService.unlockStock(to); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } }

        5. 处理逻辑

解锁  1. 查询数据库关于这个订单的库存信息         有: 证明是库存锁定成功了。 到底要不要解锁:查看订单的情况                 1. 没有这个订单 必须解锁                 2. 有这个订单:                       需要查看订单状态: 已取消 --> 解锁库存                                                         没取消 --> 不能解锁         没有: 库存锁定失败了, 库存回滚了 这种情况无需解锁

最终一致性库存解锁逻辑:基于消息队列的分布式事务+分布式表【订单自动关单】

上面是库存自动解锁的逻辑

下面是订单自动关闭的逻辑

 1. 创建订单之后 发送一个消息给MQ 告诉MQ 订单创建成功

 2.  之后这个消息 根据路由键 order.create.order 进入死信队列 order.delay.queue

@Bean public Queue orderDelayQueue(){ //Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) /* Queue(String name, 队列名字 boolean durable, 是否持久化 boolean exclusive, 是否排他 boolean autoDelete, 是否自动删除 Map arguments) 属性 */ Map arguments = new HashMap(); arguments.put("x-dead-letter-exchange", "order-event-exchange"); arguments.put("x-dead-letter-routing-key", "order.release.order"); arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false, arguments); return queue; }

3. 根据设置的时间 消息过期之后 变成死信消息 通过死信路由order.release.order 。进入队列order.release.order.queue

4. 监听队列order.release.order.queue,拿到消息,进行订单自动关闭的判断

@RabbitListener(queues = "order.release.order.queue") @Service public class OrderCloseListener { @Autowired OrderService orderService; @RabbitHandler public void listener(OrderEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到过期的订单信息:准备关闭订单" + entity.getOrderSn()); try { orderService.closeOrder(entity); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }

5. 得到消息的内容,查看订单状态。 判断是否需要关闭

public void closeOrder(OrderEntity entity) { // 查询当前这个订单的最新状态 OrderEntity orderEntity = this.getById(entity.getId()); // 关单 if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()){ OrderEntity update = new OrderEntity(); update.setId(entity.getId()); update.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(update); OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderEntity, orderTo); // TODO 发给MQ一个消息 try { // TODO 保证消息一定会发出去 每一个消息都可以做好日志记录(给数据库保存每一个消息的详细信息) // TODO 定期扫描数据库将发送失败的消息再发送一遍 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); }catch (Exception e){ // TODO 出现问题之后 将没发送成功的消息进行重试发送 } } }

上面的逻辑是这样的

mq的订单延时队列(1分钟),库存延时队列(2分钟)

下单之后,生成订单消息进入延时队列。1分钟之后消息变为死信消息进入死信队列,监听死信队列拿到消息之后,判断订单是否被支付。如果没有支付,则需要关闭订单操作。

下单之后,产生的锁库存消息也在两分钟之后变为死信消息进入死信队列。监听死信队列拿到消息之后,去查询订单的状态。在做库存解锁的判断。

订单卡顿导致的库存无法解锁

 解决方案

再往订单死信队列发送消息时,同时也往库存死信队列发送相同消息,通知库存解锁

 1. 订单解锁之后 发送消息给路由器 路由键为order.release.other

@Override public void closeOrder(OrderEntity entity) { // 查询当前这个订单的最新状态 OrderEntity orderEntity = this.getById(entity.getId()); // 关单 if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()){ OrderEntity update = new OrderEntity(); update.setId(entity.getId()); update.setStatus(OrderStatusEnum.CANCLED.getCode()); this.updateById(update); OrderTo orderTo = new OrderTo(); BeanUtils.copyProperties(orderEntity, orderTo); // TODO 发给MQ一个消息 try { // TODO 保证消息一定会发出去 每一个消息都可以做好日志记录(给数据库保存每一个消息的详细信息) // TODO 定期扫描数据库将发送失败的消息再发送一遍 rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo); }catch (Exception e){ // TODO 出现问题之后 将没发送成功的消息进行重试发送 } } }

2. 该路由键order.release.other 绑定库存释放的队列stock.release.stock.queue

/** * 订单释放和库存释放直接进行绑定 * @return */ @Bean public Binding orderReleaseOtherBinding(){ return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null); }

3. 监听stock.release.stock.queue, 拿到订单关闭的消息

@RabbitHandler public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException { System.out.println("收到订单关闭的消息.. 准备解锁库存.."); try{ wareSkuService.unlockStock(orderTo); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } }

4. 进行库存解锁

// 防止订单服务卡顿 导致订单状态一直改不了 // 库存消息优先到期 查订单状态是新建状态 什么都不做就走了 // 导致卡顿的订单永远不能解锁库存 @Transactional @Override public void unlockStock(OrderTo orderTo) { String orderSn = orderTo.getOrderSn(); // 查一下最新的库存解锁状态, 防止重复解锁库存 WareOrderTaskEntity task = orderTaskService.getOrderTaskByOrderSn(orderSn); Long taskId = task.getId(); // 按照工作单 找到所有没有解锁的库存 进行解锁 List entities = orderTaskDetailService.list(new QueryWrapper() .eq("task_id", taskId) .eq("lock_status", 1)); for (WareOrderTaskDetailEntity entity : entities){ unLockStock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId()); } }

所有的完整流程 

如何保证消息可靠性

消息丢失问题

1. 消息发送出去,由于网络问题没有抵达服务器

        做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式        做好日志记录,每个消息状态是否都被服务器收到都应该记录        做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发

2. 消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。         publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。 3. 自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机         一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重新入队

消息重复问题

成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标志

使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理

消息积压问题 消费者宕机积压消费者消费能力不足积压发送者发送流量太大 上线更多的消费者,进行正常消费上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

接口幂等性

接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的, 不会因为多次点击而产生了副作用; 比如说支付场景, 用户购买了商品支付扣款成功, 但是返回结果的时候网络异常, 此时钱已经扣了, 用户再次点击按钮, 此时会进行第二次扣款, 返回结果成功, 用户查询余额返发现多扣钱了, 流水记录也变成了两条. . . ,这就没有保证接口的幂等性

哪些情况需要防止 用户多次点击按钮用户页面回退再次提交微服务互相调用, 由于网络问题, 导致请求失败。 feign 触发重试机制其他业务情况 幂等解决方案 token 机制 (本次使用)         服务端提供了发送 token 的接口。 我们在分析业务的时候, 哪些业务是存在幂等问题的,就必须在执行业务前, 先去获取 token, 服务器会把 token 保存到 redis 中。 然后调用业务接口请求时, 把 token 携带过去, 一般放在请求头部。 服务器判断 token 是否存在 redis 中, 存在表示第一次请求, 然后删除 token,继续执行业务。 如果判断 token 不存在 redis 中, 就表示是重复操作, 直接返回重复标记给 client, 这样就保证了业务代码, 不被重复执行。

Token 获取、 比较和删除必须是原子性



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3