消息中间件 RabbitMQ 之 持久化操作 |
您所在的位置:网站首页 › mysql如何实现持久化 › 消息中间件 RabbitMQ 之 持久化操作 |
3.3 RabbitMQ 持久化
3.3.1 概念
之前的消息应答部分已经看到了如何处理消息不丢失的情况,但是如何保障当 RabbitMQ服务停掉之后消息生产者发送过来的消息不丢失呢? 默认情况下,RabbitMQ退出或者由于某种原因崩溃的时候,它会忽视队列和消息,除非告知它不要这样做。 确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。 3.3.2 队列实现持久化之前创建的队列都是非持久化的,RabbitMQ如果重启,该队列就会被删掉,如果要队列实现持久化,需要在声名队列的时候把 durable 参数(声名队列时的第二个参数)设置为true package com.example.three; import com.example.utils.RabbitUtils; import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; /** * @author 且听风吟 * @version 1.0 * @description: 生产者 * 消息在手动应答时是不丢失的,被丢失时消息会被放回队列中,重新消费 * @date 2022/4/16 0016 15:15 */ public class Task2 { /** * 队列名称 */ public static final String TASK_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { //通过工具类获取信道 Channel channel = RabbitUtils.getChannel(); /** * 生成一个队列,参数的含义如下: * 1.队列名称 * 2.队列里面的消息是否持久化(存储在磁盘上),默认情况false(存储在内存中) * 3.该队列是否进行消息共享,默认false * 4.是否自动删除,最后一个消费者断开连接后,该队列是否自动删除 * 5.其他参数 */ //使消息队列持久化 boolean durable = true; channel.queueDeclare(TASK_NAME,durable,false,false,null); //从控制台中输入信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); /** * 发送一个消息,参数含义如下: * 1.发送到哪个交换机,null表示使用默认交换机 * 2.路由的key值是哪个 本次是队列的名称 * 3.其他参数信息 * 4.发送消息的消息体(发送消息的二进制码) */ channel.basicPublish("",TASK_NAME,null,message.getBytes("UTF-8")); System.out.println("生产者发出消息:"+message); } } }但是需要注意的是,如果之前声名的队列不是持久化的。需要把原先队列先删除,或者重新创建一个持久化的队列,不然会出现错误 received 'true' but current is 'false', class-id=50, method-id=10)控制台删除队列: 以下为控制台中持久化与非持久化的UI显示区: 这个时候即使 重启 RabbitMQ队列也依然存在 3.3.3 消息实现持久化要想让消息实现持久化,需要在消息生产者修改代码。 发送消息方法 basicPublish 中的属性 basicProperties 修改为 MessageProperties.PERSISTENT_TEXT_PLAIN![]() 添加这个属性之后,就会将消息标记为 持久化。 但是并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘,持久性保证并不强。 但是对于简单队列任务而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后面的 发布确认 章节。 3.3.4 不公平分发RabbitMQ 默认情况下采取 轮训分发消息的方式,这种分发方式会对各个消费者轮流分发消息,看起来很公平,但是在某种场景下这种方法并不是很好。 比如有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另一个消费者处理速度非常慢,这个时候采用轮训分发方式的话,处理速度快的消费者就会有很大一部分的空闲时间,而处理慢的那个消费者就会在一直干活。 显然这种分配方式在这种情况下其实不太好,但是 RabbitMQ并不知道这种情况,依旧公平的进行分发。 为了避免这种情况,可以设置参数 channel.basicQos(1) 修改代码后 RabbitMQ 控制台显示: 修改逻辑: 本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未存在的消息缓冲区,希望开发人员能够**限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。** 这个时候就可以使用 basic.qos 方法设置 “预取计数” 值来完成。该值定义通道上允许的未确认消息的最大数量。一旦消息数量达到配置的数量,RabbitMQ将停止在通道上传递更多的消息,除非至少有一个未处理的消息被确认 例如,假设在通道上有未确认的消息5、6、7、8,并且通道的预取计数设置为 4,此时RabbitMQ将不会再该通道上传递任何消息,除非至少有一个未应答的消息被 ack。比如说 tag=6 这个消息刚刚被确认ack,RabbitMQ 将会感知这个情况并再发送一条消息。 消息应答 和 Qqs预取值 对用户吞吐量有重大影响。通常,增加预取值将提高向消费者传递消息的速度。 虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息数量也会增加,从而增加了消费者的 RAM消耗(随机存取存储器) |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |