RabbitMQ(三)入门

您所在的位置:网站首页 业务模式的四种类型是什么 RabbitMQ(三)入门

RabbitMQ(三)入门

2024-07-10 23:17| 来源: 网络整理| 查看: 265

上一篇文章:RabbitMQ 入门 (二)—— 创建一个基本的消息队列 本文所示代码已上传到github javaWithoutSmoke/rabbitmq-demo

RabbitMQ 入门 —— RabbitMQ的五种模式和四种交换机

文章目录 RabbitMQ 入门 —— RabbitMQ的五种模式和四种交换机六种消息模式RabbitMQ 的四种 ExchangeSimple Work Queue (简单工作队列)Work Queue (工作队列)Publish/Subscribe (发布订阅模式)关于发布订阅的思考 Routing(路由模式)关于直连交换机的思考 Topics (主题模式)Header模式 总结

六种消息模式

而在的 RabbitMQ 中,出现了六种消息传播模式: RabbitMQ 官网说明的六种模式

Simple Work Queue (简单工作队列):也就是常说的点对点模式,一条消息由一个消费者进行消费。(当有多个消费者时,默认使用轮训机制把消息分配给消费者)。Work Queues (工作队列):也叫公平队列,能者多劳的消息队列模型。队列必须接收到来自消费者的 手动ack 才可以继续往消费者发送消息。Publish/Subscribe (发布订阅模式):一条消息被多个消费者消费。Routing(路由模式):有选择的接收消息。Topics (主题模式):通过一定的规则来选择性的接收消息RPC 模式:发布者发布消息,并且通过 RPC 方式等待结果。目前这个应该场景少,而且代码也较为复杂,本章不做细讲。注意:官网最后有 Publisher Confirms 为消息确认机制。指的是生产者如何发送可靠的消息。 RabbitMQ 的四种 Exchange

在了解这些消息模式的时候,引入了一个概念 Exchange(交换机):

在发布订阅里面有对这个概念做解释:

RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。 相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换机类型定义 。

而 Exchange 的类型有下面四种:

direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。

其实在这里我们差不多可以得出消息模型与 Exchange 的关系比较:

消息模式交换机Simple Work Queue (简单工作队列),Work Queues (工作队列)空交换机Publish/Subscribe (发布订阅模式)fanout (扇形交换机)Routing(路由模式)direct (直连交换机)Topics(主题模式)topic(主题交换机) Simple Work Queue (简单工作队列)

直接看上一篇文章即可 RabbitMQ 入门 (二)—— 创建一个基本的消息队列

Work Queue (工作队列)

我们到RabbitMQ 里面新增一个队列名为 work-queue在这里插入图片描述工作队列

生产者 /** * 生产者 */ public class Producer { private static final String QUEUE_NAME = "work-queue"; public static void main(String[] args) throws IOException, TimeoutException { while (true) { // System.out.println("请输入消息:"); Scanner scanner = new Scanner(System.in); //1、创建连接 Connection connection = RabbitMQConnection.getConnection(); //2、创建通道 Channel channel = connection.createChannel(); //3、发送消息,这里使用Scanner通过控制台输入的内容来作为消息 //nextLine() 以回车结束当前的输入,会接收空格 String message = scanner.nextLine(); /* 参数说明: exchange:当期尚未指定exchange,又不能为null,这里用空字符串表示为一个默认的exchange或者匿名的exchange routingKey: 就是队列名称 props:消息的额外属性 body: 消息主体 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("消息已被发送:" + message); //发送完记得关闭连接 channel.close(); connection.close(); } } } 消费者 工作队列要确定的是有多个消费者。这里我们设置 消费者1 设置处理消息要 1s , 消费者2 处理消息要 3s /** * 消费者1 */ public class Consumer1{ private static final String QUEUE_NAME = "work-queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接 Connection connection = RabbitMQConnection.getConnection(); // 2、创建通道 Channel channel = connection.createChannel(); // 3、同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ //接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息 /* 参数说明: consumerTag:消费者关联的标签 envelope: 消息包数据 BasicProperties:消息的额外属性 body: 消息主体,当前为二进制 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 模拟处理请求耗时较长的情况 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String messageBody = new String(body); System.out.println("消费者消费消息:"+messageBody); // 手动确认, // 第一个参数: 默认的消息的唯一标志 // 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // 4、添加监听,改成手动ack channel.basicConsume(QUEUE_NAME,false, defaultConsumer); } } 消费者2 /** * 消费者2 */ public class Consumer2 { private static final String QUEUE_NAME = "work-queue"; public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接 Connection connection = RabbitMQConnection.getConnection(); // 2、创建通道 Channel channel = connection.createChannel(); // 3、同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ //接收到一个消息时会使用这个方法,这里进行重写,用来输出接收到的消息 /* 参数说明: consumerTag:消费者关联的标签 envelope: 消息包数据 BasicProperties:消息的额外属性 body: 消息主体,当前为二进制 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 模拟处理请求耗时较长的情况 Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } String messageBody = new String(body); System.out.println("消费者消费消息:"+messageBody); // 手动确认, // 第一个参数: 默认的消息的唯一标志 // 第二个参数:是否批量.当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // 4、添加监听,改成手动ack channel.basicConsume(QUEUE_NAME,false, defaultConsumer); } }

先启动两个消费者,再启动生产者,可以看到,消费者1 处理比较快,所以有多个消息会给他处理。

在这里插入图片描述 在这里插入图片描述

Publish/Subscribe (发布订阅模式)

我们先创建两个队列 subscribe1 和 subscribe2 ,

在这里插入图片描述 还有一个 fanout 类型的交换机 Publish-Subscribe在这里插入图片描述

然后把 Exchange 绑定上两个队列,不绑定的话消息没法投递到队列中

生产者 /** * 生产者 */ public class Producer { private static final String EXCHANGE_NAME = "Publish-Subscribe"; public static void main(String[] args) throws IOException, TimeoutException { for (int i = 1; i 交换机 -> 队列 -> 消费者 』这么一个模式,只不过点对点模式和工作队列模式我们可以理解成是一个匿名的交换机进行投递队列。

Exchange交换机,其实很像我们在做反向代理时用的 nginx 服务器,nginx 复则请求的转发, Exchagne 负责消息的转发。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3