spring boot rabbitmq整合rabbitmq之消息持久化存储

您所在的位置:网站首页 维生素c的功效与作用 spring boot rabbitmq整合rabbitmq之消息持久化存储

spring boot rabbitmq整合rabbitmq之消息持久化存储

2024-06-23 09:40| 来源: 网络整理| 查看: 265

rabbitmq消息持久化存储包含一下三个方面:

1、exchange的持久化

2、queue的持久化

3、message的持久化

exchange的持久化

在申明exchange的时候,有个参数:durable。当该参数为true,则对该exchange做持久化,重启rabbitmq服务器,该exchange不会消失。durable的默认值为true

public class DirectExchange extends AbstractExchange { public static final DirectExchange DEFAULT = new DirectExchange(""); public DirectExchange(String name) { super(name); } public DirectExchange(String name, boolean durable, boolean autoDelete) { super(name, durable, autoDelete); } public DirectExchange(String name, boolean durable, boolean autoDelete, Map arguments) { super(name, durable, autoDelete, arguments); } public final String getType() { return "direct"; } } public abstract class AbstractExchange extends AbstractDeclarable implements Exchange { private final String name; private final boolean durable; private final boolean autoDelete; private final Map arguments; private volatile boolean delayed; private boolean internal; public AbstractExchange(String name) { this(name, true, false); } public AbstractExchange(String name, boolean durable, boolean autoDelete) { this(name, durable, autoDelete, (Map)null); } public AbstractExchange(String name, boolean durable, boolean autoDelete, Map arguments) { this.name = name; this.durable = durable; this.autoDelete = autoDelete; if (arguments != null) { this.arguments = arguments; } else { this.arguments = new HashMap(); } }

queue的持久化

申明队列时也有个参数:durable。当该参数为true,则对该queue做持久化,重启rabbitmq服务器,该queue不会消失。durable的默认值为true

public Queue(String name) { this(name, true, false, false); } public Queue(String name, boolean durable) { this(name, durable, false, false, (Map)null); } public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) { this(name, durable, exclusive, autoDelete, (Map)null); } public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) { Assert.notNull(name, "'name' cannot be null"); this.name = name; this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration"; this.durable = durable; this.exclusive = exclusive; this.autoDelete = autoDelete; this.arguments = (Map)(arguments != null ? arguments : new HashMap()); }

message的持久化

前面我们已经讲到exchange与queue的持久化,那么message如何持久化呢?

我们在使用rabbit-client做消息持久化时,设置了BasicProperties的deliveryMode为2,做消息的持久化。

AMQP.BasicProperties properties = new AMQP.BasicProperties. Builder(). deliveryMode(2). build(); channel.basicPublish("ex.pc", "key.pc", properties, "hello world".getBytes());

那么整合了spring boot,使用RabbitTemplate如何做持久化?

首先,我们来到经常的使用的消息发送方法:RabbitTemplate类下的convertAndSend

@Override public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException { convertAndSend(exchange, routingKey, object, (CorrelationData) null); }

然后调用了该类下的重载方法:convertAndSend。该方法中将object 转换成了message

@Override public void convertAndSend(String exchange, String routingKey, final Object object, @Nullable CorrelationData correlationData) throws AmqpException { send(exchange, routingKey, convertMessageIfNecessary(object), correlationData); }

在做消息转换的时候,我们注意到,传入了一个MessageProperties对象

protected Message convertMessageIfNecessary(final Object object) { if (object instanceof Message) { return (Message) object; } return getRequiredMessageConverter().toMessage(object, new MessageProperties()); }

在MessageProperties中,有个deliveryMode属性,该属性默认值为:MessageDeliveryMode.PERSISTENT(持久化的)

public MessageProperties() { this.deliveryMode = DEFAULT_DELIVERY_MODE; this.priority = DEFAULT_PRIORITY; } static { DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY = 0; }

消息转换完成后,调用时同类方法的send方法

@Override public void send(final String exchange, final String routingKey, final Message message, @Nullable final CorrelationData correlationData) throws AmqpException { execute(channel -> { doSend(channel, exchange, routingKey, message, (RabbitTemplate.this.returnCallback != null || (correlationData != null && StringUtils.hasText(correlationData.getId()))) && RabbitTemplate.this.mandatoryExpression.getValue( RabbitTemplate.this.evaluationContext, message, Boolean.class), correlationData); return null; }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message)); }

该方法又调用了doSend方法

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity boolean mandatory, @Nullable CorrelationData correlationData) throws Exception { // NOSONAR TODO: change to IOException in 2.2. String exch = exchangeArg; String rKey = routingKeyArg; if (exch == null) { exch = this.exchange; } if (rKey == null) { rKey = this.routingKey; } if (logger.isDebugEnabled()) { logger.debug("Publishing message " + message + "on exchange [" + exch + "], routingKey = [" + rKey + "]"); } Message messageToUse = message; MessageProperties messageProperties = messageToUse.getMessageProperties(); if (mandatory) { messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid); } if (this.beforePublishPostProcessors != null) { for (MessagePostProcessor processor : this.beforePublishPostProcessors) { messageToUse = processor.postProcessMessage(messageToUse, correlationData); } } setupConfirm(channel, messageToUse, correlationData); if (this.userIdExpression != null && messageProperties.getUserId() == null) { String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class); if (userId != null) { messageProperties.setUserId(userId); } } sendToRabbit(channel, exch, rKey, mandatory, messageToUse); // Check if commit needed if (isChannelLocallyTransacted(channel)) { // Transacted channel created by this template -> commit. RabbitUtils.commitIfNecessary(channel); } }

在该方法中我们终于看到了发送消息到rabbitmq的操作:sendToRabbit。该方法将MessageProperties对象转换成了BasicProperties。至此,我们终于了解了,spring rabbit 中如何实现messge的持久化。默认的message就是持久化的

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException { BasicProperties convertedMessageProperties = this.messagePropertiesConverter .fromMessageProperties(message.getMessageProperties(), this.encoding); channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody()); }

如何改变message的持久化属性?

根据上面的源码分析,spring中默认的message就是持久化的,如何改变持久化属性?

1、使用send方法,发送message。设置message中MessageProperties的属性deliveryMode

2、自定义MessageConverter,在消息转换时,设置MessageProperties的属性deliveryMode

3、自定MessagePropertiesConverter,在MessageProperties对象转换成BasicProperties时,设置deliveryMode



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3