RabbitMq消息丢失原因及其解决方案

您所在的位置:网站首页 rabbitmq丢消息如何处理 RabbitMq消息丢失原因及其解决方案

RabbitMq消息丢失原因及其解决方案

#RabbitMq消息丢失原因及其解决方案| 来源: 网络整理| 查看: 265

RabbitMq消息丢失原因及其解决方案 一、RabbitMQ消息丢失原因

我们首先了解下一条消息从生产到消费的整个流程如下:

WX20211006-121511@2x

生产-->MQ Broker --> 消费。所以这三个环节都有丢失消息的可能。

1.1、生产者丢失消息

生产者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。

1.使用事务(性能差)

​ RabbitMQ 客户端中与事务机制相关的方法有三个: channel.txSelect 、channel.txCommit 和 channel.txRollback。channel.txSelect 用于将当前的信道设置成事务模式,channel.txCommit 用于提交事务,channel.txRollback 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback 方法来实现事务回滚。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念并不相同,需要注意区分。

​ 事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会“吸干”RabbitMQ 的性能。

2.发送回执确认(推荐)

​ 生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都已经得到了处理,注意辨别这里的确认和消费时候的确认之间的异同。

WX20211007-154639@2x

注意要点:

事务机制和 publisher confirm 机制两者是互斥的,不能共存。

事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ,这里的“发送至 RabbitMQ”的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。

1.2、RabbitMQ弄丢了数据-开启RabbitMQ的数据持久化

​ 为了防止rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。

  设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。

  而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。

  若生产者那边的confirm机制未开启的情况下,哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。

1.3、消费端弄丢了数据

​ 为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

​ 采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。

​ 这里我们可以通过RabbtiMQ 的 Web 管理平台上可以看到当前队列中的“Ready”状态和“Unacknowledged”状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。

二、模拟消费端丢失数据 2.1、初始化工程

这里不做说了,可以看我《SpringBoot集成RabbitMQ》中的相关步骤。这里我们直接写生产者和消费者。

2.2、编写生产者

我们在之前的工程上进行相应的修改,通过http方式生产消息。我们直接在IndexController中新增simpleProducer方法;

内容如下:

/**  * @Author julyWhj  * @Description IndexController$  * @Date 2021/10/7 10:18 上午  **/ @RestController public class IndexController {     @Autowired     private SimpleProducer simpleProducer;     @GetMapping("/index")     public String index() {         return "Hello RabbitMQ";     }     @GetMapping("/producer")     public String simpleProducer() {         for (int i = 0; i 


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3