RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)

您所在的位置:网站首页 rabbitmq如何开启confirm RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)

RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)

#RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)| 来源: 网络整理| 查看: 265

系列文章目录

RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)

RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式

RabbitMQ:第四章:RabbitMQ集群搭建

文章目录系列文章目录前言一、RabbitMQ 基础架构二、6 种工作模式一、理论二、代码三、消息确认机制:confirm状态和return状态一、理论二、代码总结前言

RabbitMQ 简介:RabbitMQ 基于 AMQP 标准,采用 Erlang 语言开发的消息中间件。

提示:以下是本篇文章正文内容

一、RabbitMQ 基础架构rabbitmq基础架构rabbitmq基础架构Producer:作为消息的生成者。Consumer:作为消息的消费者。Connection:消息的发布方或者消息的消费方 和broker 之间的 TCP 连接。Channel:Channel 是在 connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以 channel之间是完全隔离的,减少了操作系统建立 TCP connection 的开销。Broker:接收和分发消息的应用,RabbitMQ服务就是Message Broker。Virtual host:虚拟机,出于多租户和安全因素设计的,把 AMQP的基本组件划分到一个虚拟的分组中,可以类比mysql数据库会创建很多库,库和库之间是独立的。当多个不同的用户使用同一个RabbitMQserver 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。Queue:队列,消息队列,接收消息、缓存消息,消息最终被送到这里等待 consumer 取走。Binding:绑定,exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据Exchange:交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。

交换机常用的类型有: Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定routing key 的队列 Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

二、6 种工作模式一、理论

RabbitMQ 提供了 6 种工作模式,简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算消息队列)

简单模式:一个生产者生产消息发送到队列里面,一个消费者从队列里面拿消息,进行消费消息。一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)

简单模式简单模式

说明:类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

Work queues 工作队列模式:一个生产者生产消息发送到队列里面,一个或者多个消费者从队列里面拿消息,进行消费消息。一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)

在这里插入图片描述在这里插入图片描述

说明:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。应用场景:过年过节12306抢票,发短信给用户,可以接入多个短信服务进行发送,提供任务的处理速度。

Pub/Sub 订阅模式 :一个生产者生产消息发送到交换机里面,由交换机处理消息,队列与交换机的任意绑定,将消息指派给某个队列,一个或者多个消费者从队列里面拿消息,进行消费消息。需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

在这里插入图片描述在这里插入图片描述

说明:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

Routing 路由模式:一个生产者生产消息发送到交换机里面,并且指定一个路由key,队列与交换机的绑定是通过路由key进行绑定的,消费者在消费的时候需要根据路由key从交换机里面拿消息,进行消费消息。需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

在这里插入图片描述在这里插入图片描述

说明:Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。

Topics 通配符模式:一个生产者生产消息发送到交换机里面,并且使用通配符的形式(类似mysql里面的模糊查询,比如想获取一批带有item前缀的数据),队列与交换机的绑定是通过通配符进行绑定的,消费者在消费的时候需要根据根据通配符从交换机里面拿消息,进行消费消息。需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列

在这里插入图片描述在这里插入图片描述

说明:通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词。例如:Lazy.# 能够匹配 Lazy.insert.content或者 Lazy.insert,Lazy.* 只能匹配Lazy.insert。

二、代码在这里插入图片描述在这里插入图片描述

创建一个Maven工程,引入pom依赖:

com.rabbitmq amqp-client 5.3.0 com.google.code.gson gson 2.8.5

创建一个连接Rabbitmq的工具类:

import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitUtils { private static ConnectionFactory connectionFactory = new ConnectionFactory(); static { connectionFactory.setHost("你的rabbitmq的ip地址"); connectionFactory.setPort(5672);//RabbitMQ的默认端口号,根据实际情况修改 connectionFactory.setUsername("你的rabbitmq的用户名称"); connectionFactory.setPassword("你的rabbitmq的用户密码"); connectionFactory.setVirtualHost("你的rabbitmq的虚拟机"); } public static Connection getConnection(){ Connection conn = null; try { conn = connectionFactory.newConnection(); return conn; } catch (Exception e) { throw new RuntimeException(e); } } }

简单模式:

为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机

在这里插入图片描述在这里插入图片描述

修改工具类的虚拟机:

在这里插入图片描述在这里插入图片描述

生产者:

import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { public static void main(String[] args) throws Exception { //获取TCP长连接 Connection conn = RabbitUtils.getConnection(); //创建通信“通道”,相当于TCP中的虚拟连接 Channel channel = conn.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //channel.queueDeclare的五个参数 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null); String message = "要发送的message"; //channel.basicPublish的四个参数 //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到 //队列名称 //额外的设置属性 //最后一个参数是要传递的消息字节数组 channel.basicPublish("", RabbitConstant.QUEUE_TEST, null,message.getBytes()); channel.close(); conn.close(); System.out.println("===发送成功==="); } }

消费者:

import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception{ //获取TCP长连接 Connection conn = RabbitUtils.getConnection(); //创建通信“通道”,相当于TCP中的虚拟连接 Channel channel = conn.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null); //从MQ服务器中获取数据 //创建一个消息消费者 //第一个参数:队列名 //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法 //第三个参数要传入DefaultConsumer的实现类 channel.basicConsume(RabbitConstant.QUEUE_TEST, false, new Reciver(channel)); } } class Reciver extends DefaultConsumer { private Channel channel; //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者接收到的消息:"+message); System.out.println("消息的TagId:"+envelope.getDeliveryTag()); //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.basicAck(envelope.getDeliveryTag(), false); }

我先启动消费者后启动生产者,这样只要生产者一生产消息,消费者就可以立马消费。

在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

Work queues 工作队列模式:

为了区分好理解,我每个模式都去创建一个虚拟机,这里我先去rabbitMq管控页面创建一个虚拟机

在这里插入图片描述在这里插入图片描述

修改工具类的虚拟机

在这里插入图片描述在这里插入图片描述

为了模拟某些业务,这里使用自定义实体类发送消息,所以我新建了一个自定义实体类

/** * 自定义的实体类:发送内容 */ public class SenderContent { private String name; private String content; public SenderContent(String name, String content) { this.name = name; this.content = content; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }

生产者:

import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.google.gson.Gson; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 生成者 */ public class Producer { public static void main(String[] args) throws Exception { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null); for(int i = 1 ; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3