RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

您所在的位置:网站首页 授权失败未收到服务器消息 RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

2024-07-01 09:17| 来源: 网络整理| 查看: 265

2021-02-18. NC.

我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息到Exchange,Exchange根据路由键将消息路由到合适的Queue,Queue再将消息推(或消费者主动拉)给消费者。

在这个过程当中,Exchange根据路由键将消息路由到合适的Queue的过程,可能发生诸如

Exchange没有任何Queue与其绑定,或者根据消息的路由键,没有任何一个合适的Queue来投递消息,

从而导致消息路由失败。对于这些路由失败的消息应该如何处理呢?有两种方式:

将消息返回给投递该条消息的生产者。使用备份交换机 alternate-exchange(AE)。方式1:将消息返回给投递该条消息的生产者配置代码语言:javascript复制spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=futao spring.rabbitmq.password=123456789 spring.rabbitmq.virtual-host=/tech-sharing # 当exchange无法找到任何一个合适的queue时,将消息return给生产者 spring.rabbitmq.template.mandatory=true # 必须设置为true,否则消息消息路由失败也无法触发Return回调 spring.rabbitmq.publisher-returns=true 交换机定义与消息发送代码语言:javascript复制@Slf4j @Component public class NoMatchQueue { /** * 交换机名称 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE"; @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void send() { log.info("发送消息"); Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus()); Message message = MessageBuilder .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8)) .setContentEncoding(StandardCharsets.UTF_8.displayName()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message); } } @Configuration class ExchangeDeclare { /** * 只定义一个交换机,但是不绑定任何Queue,所以发送到该Exchange的消息都会路由失败 * * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder .topicExchange(NoMatchQueue.EXCHANGE_NAME) .durable(true) .build(); } } 设置回调函数代码语言:javascript复制rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { log.error("消息被退回:{}", returnedMessage); } }); 消息被退回:且可以看到原因是无法路由详细过程请参考: 你可知道publisherReturns参数在spring-boot-starter-amqp中的作用?方式2:使用备份交换机

使用方式1需要我们在程序中进行编码设置回调函数监听,增加了生产者代码的复杂性,那么为了消息不丢失还有没有其他方式来处理路由失败的消息呢:答案是使用备份交换机。

相较于使用回调函数,使用备份交换机只需要给交换机绑定一个备份交换机即可,当消息路由失败之后,消息将投递到备份交换机,再由备份交换机路由消息到备份队列。这样我们只需要关注这个备份队列就能知道/获取到路由失败的消息。通常情况下备份交换的Type应该设置为fanout。配置代码语言:javascript复制spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=futao spring.rabbitmq.password=123456789 spring.rabbitmq.virtual-host=/tech-sharing # 当exchange无法找到任何一个合适的queue时,将消息return给生产者 spring.rabbitmq.template.mandatory=false # 必须设置为true,否则消息消息路由失败也无法触发Return回调 spring.rabbitmq.publisher-returns=false 注意: 使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。正常业务交换机(不绑定队列,使得消息一定会路由失败)代码语言:javascript复制/** * 业务交换机 * * @return */ @Bean public Exchange noMatchQueueExchange() { return ExchangeBuilder .topicExchange(NoMatchQueueAlternateExchange.EXCHANGE_NAME) .durable(true) // 绑定备份交换机 .alternate(X_ALTERNATE) .build(); } 备份交换机/队列/绑定代码语言:javascript复制 /** * 备份队列 * * @return */ @Bean public Queue alternateQueue() { return QueueBuilder .durable("Q_ALTERNATE") .build(); } /** * 备份交换机 * * @return */ @Bean public Exchange alternateExchange() { return ExchangeBuilder .fanoutExchange(X_ALTERNATE) .durable(true) .build(); } /** * 备份绑定 * * @param alternateExchange * @param alternateQueue * @return */ @Bean public Binding alternateBinding(Exchange alternateExchange, Queue alternateQueue) { return BindingBuilder .bind(alternateQueue) .to(alternateExchange) .with("") .noargs(); } 消息投递代码语言:javascript复制/** * 正常业务交换机 */ public static final String EXCHANGE_NAME = "X_NO_MATCH_QUEUE_ALTERNATE"; @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 */ @PostConstruct public void send() { log.info("发送消息"); Order order = new Order(1, BigDecimal.TEN, OrderStatusEnum.UN_PAY.getStatus()); Message message = MessageBuilder .withBody(JSON.toJSONString(order).getBytes(StandardCharsets.UTF_8)) .setContentEncoding(StandardCharsets.UTF_8.displayName()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message); } 结果是消息被路由到备份交换机的备份队列且:如果你同时使用了两种方式,即(mandatory为true+Listener监听)和(备份交换机AlternateExchange),消息将只会路由到备份交换机,不会Return回生产者。

# 在原生RabbitMQ-client中演示这一过程:

代码语言:javascript复制@Slf4j public class AeTest { /** * 获取Channel */ private static final Channel CHANNEL = MqChannelUtils.getChannel(); /** * 备份交换机 */ private static final String X_AE = "X_AE"; /** * 备份交换机绑定的队列 */ private static final String Q_AE = "Q_AE"; /** * 正常业务的交换机 */ private static final String X_1 = "X_1"; public static void main(String[] args) throws IOException { // 定义备份交换机-其实也是一个正常的交换机 CHANNEL.exchangeDeclare(X_AE, BuiltinExchangeType.FANOUT, true); // 定义备份队列 CHANNEL.queueDeclare(Q_AE, true, false, false, null); // 绑定备份 CHANNEL.queueBind(Q_AE, X_AE, ""); HashMap arguments = new HashMap(); // 绑定的备份交换机 arguments.put("alternate-exchange", X_AE); // 定义交换机 CHANNEL.exchangeDeclare(X_1, BuiltinExchangeType.TOPIC, false, false, arguments); // 添加监听器,看看是否还会return消息 CHANNEL.addReturnListener(new ReturnCallback() { @Override public void handle(Return returnMessage) { log.error("消息被退回{}", returnMessage); } }); // 尝试向交换机发送消息(无法路由)- mandatory参数无效 CHANNEL.basicPublish(X_1, "", false, false, new AMQP.BasicProperties(), "阿依古丽".getBytes(StandardCharsets.UTF_8)); } } 两个交换机,正常的交换机X_1和备份交换机X_AE备份交换机绑定的队列已经接收到了路由失败的消息其他要注意的点:备份交换机的Type设置为fanout比较合适,这样可以忽略RoutingKey,避免备份交换机又路由失败。被投递到备份交换机的RoutingKey为消息投递到MQ时的原始RoutingKey,不会变,这一点在其他场景下也是一样的。使用备份交换机模式,mandatory将无效,即就算mandatory设置为false,路由失败的消息同样会被投递到绑定的备份交换机。# 源代码

https://gitee.com/FutaoSmile/tech-sharing-mq

往期推荐

你可知道publisherReturns参数在spring-boot-starter-amqp中的作用?

SpringBoot RabbitMQ实现消息可靠投递

RabbitMQ死信队列在SpringBoot中的使用

使用RabbitMQ实现未支付订单在30分钟后自动过期

SpringBoot如何做到自动帮我们创建RabbitMQ的Queue和Exchange的?

欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3