RabbitMQ学习笔记

您所在的位置:网站首页 java消息队列mq的实现原理 RabbitMQ学习笔记

RabbitMQ学习笔记

2023-06-04 06:47| 来源: 网络整理| 查看: 265

RabbitMQ消息监听容器

    你好!欢迎来到Java成长笔记,主要是用于相互交流,相互学习,也希望分享能帮到大家,如有错误之处,希望指正,谢谢!

    通过配置消息监听容器,可以对队列或者多个队列进行监听,开发者可以进行相应的业务处理,实现接收和发送数据的自定义处理。

SimpleMessageListenerContainer

1、SimpleMessageListenerContainer进行很多设置,消费者的配置项进行设置,可以满足监听队列(多个队列)、自动启动、自动声明功能。 2、简单消息监听容器     1)设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等。     2)设置消费者数量、最小最大数量、批量消费。 3、设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数。 4、设置消费者标签生成策略、是否独占模式、消费者属性等。 5、设置具体的监听器、消息转換器等等。 6、SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。

基础设置加载,代码展示如下:

/** * 基础设置加载 */ public void init (final SimpleMessageListenerContainer container) { // 同时监听多个队列 container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); // 设置当前的消费者数量和最大消费者数量 container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); // 设置是否重回队列 container.setDefaultRequeueRejected(false); // 设置是否自动签收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 设置是否监听 container.setExposeListenerChannel(true); // 设置消费端标签策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String s) { return s + "_" + UUID.randomUUID().toString(); } }); }

消息监听端配置,代码展示如下:

@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); // 设置消息监听 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { final String msg = new String(message.getBody()); log.error(" 消费者 --->>> " + msg); } }); return container; } MessageListenerAdapter 适配器方法(一)

指定消息转换器,将字节数组转换为String

代码展示如下:

import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import java.util.Optional; @Slf4j public class TextMessageConverter implements MessageConverter { /** * @Description: Object转为Message */ @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } /** * @Description: Message转为Java对象 */ @Override public Object fromMessage(Message message) throws MessageConversionException { final String contentType = message.getMessageProperties().getContentType(); Optional.ofNullable(contentType) .filter(StringUtils::isNotBlank) .filter(x->contentType.contains("test")) .map(z->{ return new String(message.getBody()); }); return message.getBody(); } } // 监听类设置配置 @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); // 方法委托 自定义方法 adapter.setDefaultListenerMethod("consumeMessage"); adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter); return container; } public void consumeMessage(byte[] messageBody) { log.error("字节数组方法,消息内容:{}", new String(messageBody)); } // 测试类 @Test public void testSendMessage4Text() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("TextMessageConverter配置mq消息Message".getBytes(), messageProperties); // 发送消息 rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); }

如果不配置setDefaultListenerMethod方法,下图为监听类默认方法 MessageListenerAdapter默认方法handleMessage

适配器方法(二)

将队列名称和消息转换方法进行一对一匹配

代码展示如下:

@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setMessageConverter(new TextMessageConverter()); Map queueOrTagToMethodName = Maps.newHashMap(); queueOrTagToMethodName.put("queue001", "method1"); queueOrTagToMethodName.put("queue002", "method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); container.setMessageListener(adapter); return container; } // 监听方法 public void method1(String messageBody) { log.error("method1收到消息内容:{} ", messageBody); } public void method2(String messageBody) { log.error("method2收到消息内容:{} ", messageBody); } // 测试类 @Test public void testSendMessage4Text() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("TextMessageConverter配置mq消息Message".getBytes(), messageProperties); // 发送消息 rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); } MessageConverter转换器 JSON格式转换器

设置json格式转换器

代码展示如下:

// 支持json格式转换器 @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } // 测试类 @Test public void testSendJsonMessage() throws Exception { final Order order = new Order(UUID.randomUUID().toString().replaceAll("-",""), "订单详情", "订单描述信息"); final String json = new ObjectMapper().writeValueAsString(order); log.error("testSendJsonMessage json: " + json); final MessageProperties messageProperties = new MessageProperties(); // 需要修改contentType为 application/json messageProperties.setContentType("application/json"); final Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } // 监听类 public void consumeMessage(Map messageBody) { log.error("consumeMessage方法,消息内容:{} ", messageBody); } Java对象转换

设置java对象转换

代码展示如下:

@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } // 监听类 public void consumeMessage(Order order) { log.error("consumeMessage order对象,消息内容,id:{} ", order.getId(),",name:{} ", order.getName(), ", content:{} ", order.getContent()); } // 测试类 @Test public void testSendJavaMessage() throws Exception { final Order order = new Order(UUID.randomUUID().toString().replaceAll("-",""), "订单详情", "订单描述信息"); final String json = new ObjectMapper().writeValueAsString(order); log.error("testSendJavaMessage json: " + json); final MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); messageProperties.getHeaders().put("__TypeId__", "com.show.model.Order"); final Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } 支持java对象多映射转换

设置java对象多映射转换

代码展示如下:

@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); final DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); final Map idClassMapping = Maps.newHashMap(); idClassMapping.put("order", com.show.model.Order.class); idClassMapping.put("packaged", com.show.model.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } // 监听类 public void consumeMessage(Order order) { log.error("consumeMessage order对象,id{},name{},content{}", order.getId(), order.getName(), order.getContent()); } public void consumeMessage(Packaged pack) { log.error("consumeMessage package对象,id{},name{},content{}", pack.getId(), pack.getName(), pack.getDescription()); } // 测试类 @Test public void testSendMappingMessage() throws Exception { final ObjectMapper mapper = new ObjectMapper(); // Order类处理 final Order order = new Order(UUID.randomUUID().toString().replaceAll("-",""), "订单详情", "订单描述信息"); final String json1 = mapper.writeValueAsString(order); log.error("testSendMappingMessage json1: " + json1); final MessageProperties messageProperties1 = new MessageProperties(); messageProperties1.setContentType("application/json"); messageProperties1.getHeaders().put("__TypeId__", "order"); final Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); // Packaged类处理 final Packaged pack = new Packaged(UUID.randomUUID().toString().replaceAll("-",""), "包裹详情", "包裹描述信息"); final String json2 = mapper.writeValueAsString(pack); log.error("testSendMappingMessage json2: " + json2); final MessageProperties messageProperties2 = new MessageProperties(); messageProperties2.setContentType("application/json"); messageProperties2.getHeaders().put("__TypeId__", "packaged"); final Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); } Java多对象映射转换

设置java对象多映射转换

代码展示如下:

@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); final DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); final Map idClassMapping = Maps.newHashMap(); idClassMapping.put("order", com.show.model.Order.class); idClassMapping.put("packaged", com.show.model.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); return container; } // 监听类 public void consumeMessage(Order order) { log.error("consumeMessage order对象,id{},name{},content{}", order.getId(), order.getName(), order.getContent()); } public void consumeMessage(Packaged pack) { log.error("consumeMessage package对象,id{},name{},content{}", pack.getId(), pack.getName(), pack.getDescription()); } // 测试类 @Test public void testSendMappingMessage() throws Exception { final ObjectMapper mapper = new ObjectMapper(); final Order order = new Order(UUID.randomUUID().toString().replaceAll("-",""), "订单详情", "订单描述信息"); final String json1 = mapper.writeValueAsString(order); log.error("testSendMappingMessage json1: " + json1); final MessageProperties messageProperties1 = new MessageProperties(); messageProperties1.setContentType("application/json"); messageProperties1.getHeaders().put("__TypeId__", "order"); final Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); final Packaged pack = new Packaged(UUID.randomUUID().toString().replaceAll("-",""), "包裹详情", "包裹描述信息"); final String json2 = mapper.writeValueAsString(pack); log.error("testSendMappingMessage json2: " + json2); final MessageProperties messageProperties2 = new MessageProperties(); messageProperties2.setContentType("application/json"); messageProperties2.getHeaders().put("__TypeId__", "packaged"); final Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); } 多对象转换包含image和pdf转换

设置多对象转换包含image和pdf转换

代码展示如下:

@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); this.init(container); final MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); // 全局的转换器 ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); final TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); final Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); final ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/jpg", imageConverter); convert.addDelegate("image", imageConverter); final PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter); return container; } // PDF监听类 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.UUID; @Slf4j public class PDFMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.error("PDF MessageConverter:{}", System.currentTimeMillis()); final byte[] body = message.getBody(); final String fileName = UUID.randomUUID().toString().replaceAll("-",""); final String path = "H:/rebbitmq/" + fileName + ".pdf"; final File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } } // Image监听类 import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.UUID; @Slf4j public class ImageMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { log.error("Image MessageConverter:{}", System.currentTimeMillis()); final Object _extName = message.getMessageProperties().getHeaders().get("extName"); final String extName = _extName == null ? "jpg" : _extName.toString(); final byte[] body = message.getBody(); final String fileName = UUID.randomUUID().toString().replaceAll("-",""); final String path = "H:/rebbitmq/" + fileName + "." + extName; final File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } } // Image测试类 @Test public void testSendImageExtConverterMessage() throws Exception { byte[] body = Files.readAllBytes(Paths.get("H:/rebbitmq/", "aw.jpg")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("image/jpg"); messageProperties.getHeaders().put("extName", "jpg"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "image_queue", message); } // PDF测试类 @Test public void testSendPdfExtConverterMessage() throws Exception { byte[] body = Files.readAllBytes(Paths.get("H:/rebbitmq/", "dell.pdf")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "pdf_queue", message); }

本章完结,后续还会持续更新,分享Java成长笔记,希望我们能一起成长。如果你觉得我的分享有用,记得点赞和关注哦!这对我是最好的鼓励。谢谢! PS:转载请注明出处!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3