消息中间件 RabbitMQ 之 持久化操作

您所在的位置:网站首页 mysql如何实现持久化 消息中间件 RabbitMQ 之 持久化操作

消息中间件 RabbitMQ 之 持久化操作

2023-10-04 18:15| 来源: 网络整理| 查看: 265

3.3 RabbitMQ 持久化 3.3.1 概念

之前的消息应答部分已经看到了如何处理消息不丢失的情况,但是如何保障当 RabbitMQ服务停掉之后消息生产者发送过来的消息不丢失呢?

默认情况下,RabbitMQ退出或者由于某种原因崩溃的时候,它会忽视队列和消息,除非告知它不要这样做。

确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。

3.3.2 队列实现持久化

之前创建的队列都是非持久化的,RabbitMQ如果重启,该队列就会被删掉,如果要队列实现持久化,需要在声名队列的时候把 durable 参数(声名队列时的第二个参数)设置为true

package com.example.three; import com.example.utils.RabbitUtils; import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; /** * @author 且听风吟 * @version 1.0 * @description: 生产者 * 消息在手动应答时是不丢失的,被丢失时消息会被放回队列中,重新消费 * @date 2022/4/16 0016 15:15 */ public class Task2 { /** * 队列名称 */ public static final String TASK_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { //通过工具类获取信道 Channel channel = RabbitUtils.getChannel(); /** * 生成一个队列,参数的含义如下: * 1.队列名称 * 2.队列里面的消息是否持久化(存储在磁盘上),默认情况false(存储在内存中) * 3.该队列是否进行消息共享,默认false * 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除 * 5.其他参数 */ //使消息队列持久化 boolean durable = true; channel.queueDeclare(TASK_NAME,durable,false,false,null); //从控制台中输入信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); /** * 发送一个消息,参数含义如下: * 1.发送到哪个交换机,null表示使用默认交换机 * 2.路由的key值是哪个 本次是队列的名称 * 3.其他参数信息 * 4.发送消息的消息体(发送消息的二进制码) */ channel.basicPublish("",TASK_NAME,null,message.getBytes("UTF-8")); System.out.println("生产者发出消息:"+message); } } }

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nkQ8BcLY-1650197142212)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417151752418.png)]

但是需要注意的是,如果之前声名的队列不是持久化的。需要把原先队列先删除,或者重新创建一个持久化的队列,不然会出现错误

received 'true' but current is 'false', class-id=50, method-id=10)

控制台删除队列:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GUhKoWXt-1650197142214)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417152745907.png)]

以下为控制台中持久化与非持久化的UI显示区:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZwByBuOr-1650197142215)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417152939665.png)]

这个时候即使 重启 RabbitMQ队列也依然存在

3.3.3 消息实现持久化

要想让消息实现持久化,需要在消息生产者修改代码。

发送消息方法 basicPublish 中的属性 basicProperties 修改为 MessageProperties.PERSISTENT_TEXT_PLAIN[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tlflXmPF-1650197142218)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417154406099.png)][外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v0NjhmTd-1650197142219)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417154445326.png)]

添加这个属性之后,就会将消息标记为 持久化。

但是并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘,持久性保证并不强。

但是对于简单队列任务而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后面的 发布确认 章节。

3.3.4 不公平分发

RabbitMQ 默认情况下采取 轮训分发消息的方式,这种分发方式会对各个消费者轮流分发消息,看起来很公平,但是在某种场景下这种方法并不是很好。

比如有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另一个消费者处理速度非常慢,这个时候采用轮训分发方式的话,处理速度快的消费者就会有很大一部分的空闲时间,而处理慢的那个消费者就会在一直干活。

显然这种分配方式在这种情况下其实不太好,但是 RabbitMQ并不知道这种情况,依旧公平的进行分发。

为了避免这种情况,可以设置参数 channel.basicQos(1)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WeLX43Vp-1650197142220)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417161934631.png)]

package com.example.three; import com.example.utils.RabbitUtils; import com.example.utils.SleepUtils; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author 且听风吟 * @version 1.0 * @description: 消费者 C1 * 消息在手动应答时是不丢失的,被丢失时消息会被放回队列中,重新消费 * @date 2022/4/16 0016 15:39 */ public class Work02 { /** * 队列名称 */ public static final String TASK_NAME = "ack_queue"; //接收消息 public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitUtils.getChannel(); System.out.println("C1等待接收消息处理时间较短"); //采用手动应答 //声名 消费者成功消费(接收)消息的回调 DeliverCallback deliverCallback = (consumerTag, message) -> { //沉睡一秒 SleepUtils.sleep(1); //打印消息体 System.out.println("消费者成功消费消息,消息体为:"+new String(message.getBody(),"UTF-8")); /** * 手动应答 参数: * 1.消息的标记,唯一标识 tag * 2.是否批量应答 选择false,处理一个应答一个 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //声名 消费者取消消费消息时的回调 CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断"); }; /** * 消费者消费(接收)消息,参数含义如下: * 1.消费哪个队列 * 2.消费之后是否要自动应答 * 3.消费者成功消费消息的回调 * 4.消费者取消消费消息的回调 */ channel.basicQos(1); channel.basicConsume(TASK_NAME,false,deliverCallback,cancelCallback); } }

修改代码后 RabbitMQ 控制台显示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o6C0GjYL-1650197142221)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417163515096.png)]

修改逻辑:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xVjrbASO-1650197142222)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417163621653.png)]

意思就是,如果这个任务消费者还没有处理完或者还没有应答给队列,队列就不要先分配给这个消费者,该消费者目前只能处理一个任务,然后RabbitMQ就会把该任务分配给没有那么忙的空闲的消费者如果所有的消费者都没有完成手上的任务,队列还在不停的添加新任务,有可能会遇到队列被撑满的情况,这个时候就只能添加新的消费者 或者 改变为其他存储任务的策略 3.3.5 预取值/预取计数(prefetch)

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未存在的消息缓冲区,希望开发人员能够**限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。**

这个时候就可以使用 basic.qos 方法设置 “预取计数” 值来完成。该值定义通道上允许的未确认消息的最大数量。一旦消息数量达到配置的数量,RabbitMQ将停止在通道上传递更多的消息,除非至少有一个未处理的消息被确认

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FA5Q5bSP-1650197142224)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417195847157.png)]

例如,假设在通道上有未确认的消息5、6、7、8,并且通道的预取计数设置为 4,此时RabbitMQ将不会再该通道上传递任何消息,除非至少有一个未应答的消息被 ack。比如说 tag=6 这个消息刚刚被确认ack,RabbitMQ 将会感知这个情况并再发送一条消息。

消息应答 和 Qqs预取值 对用户吞吐量有重大影响。通常,增加预取值将提高向消费者传递消息的速度。

虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息数量也会增加,从而增加了消费者的 RAM消耗(随机存取存储器)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m0Qp9S8W-1650197142224)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220417195719844.png)]



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3