【RabbitMQ】高级使用:消息过期设置、死信队列、延迟队列、流控

您所在的位置:网站首页 查看mq队列消息接收时间 【RabbitMQ】高级使用:消息过期设置、死信队列、延迟队列、流控

【RabbitMQ】高级使用:消息过期设置、死信队列、延迟队列、流控

2024-07-12 22:55| 来源: 网络整理| 查看: 265

1.消息过期时间(TTL)

如果我们想设置消息在指定时间内没被消费就过期,有如下种设置方式:

1.1 Queue TTL

所有队列中的消息超过时间未被消费时都会过期,通过队列属性设置消息过期时间

// 使用SpringAMQP声明队列 @Bean("ttlQueue") public Queue queue(){ Map map = new HashMap(); map.put("x-message-ttl",11000);// 队列中的消息未被消费 11 秒后过期 return new Queue("GP_TTL_QUEUE",true,false,false,map); }

在这里插入图片描述

1.2 Message TTL

设置单条消息的过期时间,在发送消息的时候指定消息属性

// SpringAMQP封装的Message MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("4000"); // 消息的过期属性,单位ms messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = new Message("这条消息4秒后过期".getBytes(), messageProperties); rabbitTemplate.send("GP_TTL_EXCHANGE", "gupao.ttl", message);

如果同时指定了Message TTL和Queue TTL,则小的那个时间生效

2.死信队列

消息在某些情况下会变成死信(Dead Letter),比如:

消息被消费者拒绝并且未设置重回队列:(NACK|| Reject)&&requeue== false消息过期队列达到最大长度,超过了Maxlength(消息数)或者 Maxlengthbytes(字节数),最先入队的消息会被发送到DLX。

照理说死信应该是被抛弃的,但是如果定义了死信交换机,当消息成为死信就会进入死信队列

DLX:死信交换机(Dead Letter Exchange),队列在创建的时候可以指定一个,实际上也是普通的交换机DLQ:死信队列(DeadLetterQueue),被死信交换机绑定的队列,实际也是普通队列(例如替补球员也是普通球员)

在这里插入图片描述

2.1 死信队列使用

1). 原交换机(GP_ORI_USE_EXCHANGE)、原队列(GP_ORI_USE_QUEUE) ,相互绑定。

设置队列中的消息10秒钟过期,因为没有消费者,会变成死信。指定原队列的死信交换机(GP_DEAD_LETTER_EXCHANGE)。 // 使用Spring AMQP进行声明 @Bean("oriUseExchange") // 原交换机 public DirectExchange exchange() { return new DirectExchange("GP_ORI_USE_EXCHANGE", true, false, new HashMap()); } @Bean("oriUseQueue") // 原队列 public Queue queue() { Map map = new HashMap(); // 10秒钟后成为死信 map.put("x-message-ttl", 10000); // 死信交换机,当前队列中的消息变成死信后进入死信交换机 map.put("x-dead-letter-exchange", "GP_DEAD_LETTER_EXCHANGE"); return new Queue("GP_ORI_USE_QUEUE", true, false, false, map); } @Bean // 绑定原队列到原交换机 public Binding binding(@Qualifier("oriUseQueue") Queue queue,@Qualifier("oriUseExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("gupao.ori.use"); }

2). 声明死信交换机 ( GP_DEAD_LETTER_EXCHANGE ) 、 死信队列(GP_DEAD_LETTER_QUEUE),相互绑定

@Bean("deatLetterExchange") // 队列的死信交换机 public TopicExchange deadLetterExchange() { return new TopicExchange("GP_DEAD_LETTER_EXCHANGE", true, false, new HashMap()); } @Bean("deatLetterQueue") // 死信队列 public Queue deadLetterQueue() { return new Queue("GP_DEAD_LETTER_QUEUE", true, false, false, new HashMap()); } @Bean // 绑定死信队列到死信交换机 public Binding bindingDead(@Qualifier("deatLetterQueue") Queue queue,@Qualifier("deatLetterExchange") TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("#"); // 无条件路由 }

3). 最终消费者监听死信队列 4). 生产者发送消息

在这里插入图片描述

3.延迟队列

我们在实际业务中有一些需要延时发送消息的场景,例如:

家里有一台智能热水器,需要在30分钟后启动未付款的订单,15分钟后关闭

RabbitMQ本身不支持延迟队列,总的来说有三种实现方案:

先存储到数据库,用定时任务扫描利用RabbitMQ的死信队列(Dead Letter Queue)实现利用rabbitmq-delayed-message-exchange插件 3.1 TTL+DLX

基于消息TTL,我们来看一下如何利用死信队列(DLQ)实现延迟队列:

创建一个交换机创建一个队列,与上述交换机绑定,并且通过属性指定队列的死信交换机。创建一个死信交换机创建一个死信队列将死信交换机绑定到死信队列消费者监听死信队列

消息的流转流程:生产者 --> 原交换机 --> 原队列(超过TTL之后)–> 死信交换机 --> 死信队列 --> 最终消费者

使用死信队列实现延时消息的缺点:

如果统一用队列来设置消息的 TTL,当梯度非常多的情况下,比如 1 分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递(比如第一条消息过期TTL是30min,第二条消息TTL是10min。10分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递)。可能存在一定的时间误差 3.2 延迟队列插件

在 RabbitMQ 3.5.7 及 以后的版 本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延时队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange 插件下载地址:https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

1). 进入插件目录

whereis rabbitmq cd/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

2). 下载插件

wget https://bintray.com/rabbitmq/community-plugins/download_file? file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

如果下载的文件名带问号则需要改名,如图

在这里插入图片描述

mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez

在这里插入图片描述

3). 启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4). 停用插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

5). 使用插件

通过声明一个x-delayed-message类型的Exchange来使用delayed-messaging特性。 x-delayed-message是插件提供的类型,并不是rabbitmq本身的(区别于direct、topic、fanout、headers)。

在这里插入图片描述

@Bean("delayExchange") public TopicExchange exchange() { Map argss = new HashMap(); argss.put("x-delayed-type", "direct"); return new TopicExchange("GP_DELAY_EXCHANGE", true, false, argss); }

生产者:消息属性中指定x-delay参数(SpringAMQP)

MessageProperties messageProperties = new MessageProperties(); // 延迟的间隔时间,目标时刻减去当前时刻 messageProperties.setHeader("x-delay", delayTime.getTime() - now.getTime()); Message message = new Message(msg.getBytes(), messageProperties); // 不能在本地测试,必须发送消息到安装了插件的服务端 rabbitTemplate.send("GP_DELAY_EXCHANGE", "#", message); 4.服务端流控

当RabbitMQ生产MQ消息的速度远大于消费消息的速度时,会产生大量的消息堆积,占用系统资源,导致机器的性能下降。我们想要控制服务端接收的消息的数量,应该怎么做呢?

4.1 长度控制

队列有两个控制长度的属性:

x-max-length:队列中最大存储最大消息数,超过这个数量,队头的消息会被丢弃。x-max-length-bytes:队列中存储的最大消息容量(单位bytes),超过这个容量,队头的消息会被丢弃。

在这里插入图片描述

需要注意的是,设置队列长度只在消息堆积的情况下有意义,而且会删除先入队的消息,不能真正地实现服务端限流。

4.2 内存控制

RabbitMQ 会在启动时检测机器的物理内存数值。默认当 MQ 占用 40% 以上内存时,MQ 会主动抛出一个内存警告并阻塞所有连接(Connections)。可以通过修改rabbitmq.config 文件来调整内存阈值,默认值是 0.4,如下所示:

[{rabbit,[{vm_memory_high_watermark,0.4}]}].

也可以用命令动态设置,如果设置成0,则所有的消息都不能发布

rabbitmqctl set_vm_memory_high_watermark 0.3 4.3 磁盘控制

另一种方式是通过磁盘来控制消息的发布。当磁盘空间低于指定的值时(默认50MB),触发流控措施。

例如:指定为磁盘的30%或者2GB

disk_free_limit.relative=3.0 disk_free_limit.absolute=2GB

更多相关配置内容可以参考 官网…

5.消费端流控

默认情况下,如果不进行配置,RabbitMQ会尽可能快速地把队列中的消息发送到消费者。因为消费者会在本地缓存消息,如果消息数量过多,可能会导致OOM或者影响其他进程的正常运行。

在消费者处理消息的能力有限,例如消费者数量太少,或者单条消息的处理时间过长的情况下,如果我们希望在一定数量的消息消费完之前,不再推送消息过来,就要用到消费端的流量限制措施。

可以基于Consumer或者channel设置prefetch count的值,含义为Consumer端的最大的unackedmessages数目。当超过这个数值的消息未被确认,RabbitMQ会停止投递新的消息给该消费者。

channel.basicQos(2);// 如果超过 2 条消息没有发送 ACK,当前消费者不再接受队列消息 channel.basicConsume(QUEUE_NAME,false,consumer);

Spring Boot配置:

spring.rabbitmq.listener.simple.prefetch=2

关于Consumer Prefetch 官网也给出了相应解释…



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3