深入了解ActiveMQ!

您所在的位置:网站首页 activemq版本怎么看 深入了解ActiveMQ!

深入了解ActiveMQ!

2023-10-08 12:11| 来源: 网络整理| 查看: 265

认识MQ(Message Queue)什么是消息队列

aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUZrUnRsbHhEYUlCdTZSbXdPaWNDMWQ1SW1tS0tZSm9sWVBlZXJvcEF1WW9UcnBEUWdZU1pKQXZ3LzY0MA.png

消息队列

首先我们先从以下几个维度来认识一下消息队列:

消息队列:一般我们会简称它为MQ(MessageQueue)消息(Message):传输的数据。队列(Queue):队列是一种先进先出的数据结构。消息队列从字面的含义来看就是一个存放消息的容器。消息队列可以简单理解为:把要传输的数据放在队列中。把数据放到消息队列叫做生产者。从消息队列里边取数据叫做消费者。

为什么需要消息队列

使用消息队列主要是基于以下三个主要场景:

解耦

异步

削峰/限流

下面我们分场景来描述下使用消息队列带来的好处

解耦

假设我们有一个用户系统A,用户系统A可以产生一个userId。

然后,现在有系统B和系统C都需要这个userId去做相关的操作。

aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUZVaWNNRzZDVVJXTkx6Z1hHazVyU0dobEdZeXBhZ3ZWR3pIZkxrMlhqakwwSlUyd0w5WUNqU0JBLzY0MA.png

解耦前架构

伪码大致如下:

java public class SystemA { // 系统B和系统C的依赖 SystemB systemB = new SystemB(); SystemC systemC = new SystemC(); // 系统A独有的数据userId private String userId = "activeMq-1234567890"; public void doSomething() { // 系统B和系统C都需要拿着系统A的userId去操作其他的事 systemB.SystemBNeed2do(userId); systemC.SystemCNeed2do(userId); } }

「这样类似的业务场景大家是不是很熟悉,大家是不是这样写很合情合理,也很简单。」

某一天,系统B的负责人告诉系统A的负责人,现在系统B的SystemBNeed2do(String userId)这个接口不再使用了,让系统A别去调它了。

于是,系统A的负责人说"好的,那我就不调用你了。",于是就把调用系统B接口的代码给删掉了。代码变成这样了:

java public void doSomething() { // 系统A不再调用系统B的接口了 //systemB.SystemBNeed2do(userId); systemC.SystemCNeed2do(userId); }

由于业务需要,系统D说也需要用到系统A的userId,于是代码改成了这样:

java public void doSomething() { // 已经不再需要系统B的依赖了 //systemB.SystemBNeed2do(userId); // 系统C和系统D都需要拿着系统A的userId去操作其他的事 systemC.SystemCNeed2do(userId); systemD.SystemDNeed2do(userId); }

当前系统A、B、C、D系统的交互是这样子的。

aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUY4Njh2bFpNdExZczlWd3U4U2VaT0VQZUNyV1ZpYkhqM0xnZlZ2amtIZzN4NFA2NnV6amM2VmV3LzY0MA.png

系统交互

随着业务需求的变化,代码也要一遍一遍的修改。

还会存在另外一个问题,调用系统C的时候,如果系统C挂了,系统A还要想办法处理。如果调用系统D时,由于网络延迟,请求超时了,那系统A是反馈fail还是重试?

那么怎么去解决这样的现状呢,如何从频繁的修改代码中解脱呢?

这时候我们就引入一层消息队列中间件,交互图如下:

1.png

解耦

将系统A产生的userId写到消息队列中,系统C和系统D从消息队列中拿数据。

这样有什么好处?

系统A只负责把数据写到队列中,谁想要或不想要这个数据(消息),系统A一点都不关心。

即便现在系统D不想要userId这个数据了,系统B又突然想要userId这个数据了,都跟系统A无关,系统A一点代码都不用改。

系统D拿userId不再经过系统A,而是从消息队列里边拿。系统D即便挂了或者请求超时,都跟系统A无关,

只跟消息队列有关。这样一来,系统A与系统B、C、D都解耦了。

异步

系统A做的是主要的业务,而系统B、C、D是非主要的业务。比如系统A处理的是订单下单,而系统B是订单下单成功了,那发送一条短信告诉具体的用户此订单已成功,而系统C和系统D也是处理一些小事而已。

那么此时,为了提高用户体验和吞吐量,其实可以异步地调用系统B、C、D的接口。

aHR0cHM6Ly9tbWJpei5xcGljLmNuL21tYml6X3BuZy9QeE16VDBPaWJmNGpONDBYQThKdDNaTVlyRE54b2NQSUZGS0NiWmFmZjBHdDZGTUtMZVV0cGlhZmZQUnA2ZjFIZW95NDBPNWljNmxyZjZKZjdjSktLVk94QS82NDA.png

异步

削峰/限流

我们再来一个场景,现在我们每个月要搞一次大促,大促期间的并发可能会很高的,比如每秒3000个请求。假设我们现在有两台机器处理请求,并且每台机器只能每次处理1000个请求。

3.png

削峰前

系统B和系统C根据自己的能够处理的请求数去消息队列中拿数据,这样即便有每秒有8000个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩。

4.png

削峰/限流

什么是JMS MQ

全称:Java MessageService 中文:Java 消息服务。

JMS 是 Java 的一套 API 标准,最初的目的是为了使应用程序能够访问现有的MOM 系 统(MOM 是 MessageOriented Middleware 的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。) 后来被许多现有的 MOM 供应商采用,并实现为MOM 系统。

常见 MOM 系统包括 Apache的 ActiveMQ、阿里巴巴的 RocketMQ、IBM 的 MQSeries、Microsoft 的 MSMQ、BEA 的 RabbitMQ 等。(并非全部的 MOM 系统都遵循JMS 规范)】

基于 JMS 实现的 MOM,又被称为JMSProvider。

JMS中的一些概念

「Broker」

消息服务器,作为server提供消息核心服务

「Provider 生产者」

消息生产者是由会话创建的一个对象,用于把消息发动到一个目的地

「Consumer 消费者」

消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法:

同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。

异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

「P2P 点对点消息模型」

消息生产者生产消息发送到queue 中,然后消息消费者从queue 中取出并且消费消息。消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费。

「Pub/Sub 发布订阅消息模型」

消息生产者(发布)将消息发布到topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。当生产者发布消息,不管是否有消费者。都不会保存消息一定要先有消息的消费者,后有消息的生产者。

「P2P vs Pub/Sub」

5.png

P2P vs Pub/Sub

「Queue」

队列存储,常用于点对点消息模型

默认只能由唯一的一个消费者处理。一旦处理消息删除。

「Topic」

主题存储,用于订阅/发布消息模型

主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处理的业务场景中可使用。

「ConnectionFactory」

连接工厂,jms中用它创建连接

连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。

「Connection」

JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。

「Destination 消息的目的地」

目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。

订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

「Session」

JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

消息可靠性机制「确认 JMS消息」

只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。

在事务性会话中,当一个事务被提交的时候,确认自动发生。

在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

「Session.AUTO_ACKNOWLEDGE」。当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

「Session.CLIENT_ACKNOWLEDGE」。客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。

「Session.DUPS_ACKNOWLEDGE」。该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。

「持久性」

JMS 支持以下两种消息提交模式:

「PERSISTENT」。指示JMSProvider持久保存消息,以保证消息不会因为JMS Provider的失败而丢失。

「NON_PERSISTENT」。不要求JMS Provider持久保存消息。

「优先级」

可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。「需要注意的是,JMSProvider并不一定保证按照优先级的顺序提交消息。」

「消息过期」

可以设置消息在一定时间后过期,默认是永不过期

「临时目的地」

可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

「持久订阅」

首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须是一个topic,第二个参数是订阅的名称。

JMS Provider会存储发布到持久订阅对应的topic上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户ID、相同的主题和相同的订阅名再次调用会话上的createDurableSubscriber方法,那么该持久订阅就会被激活。

JMS Provider会向客户发送客户处于非激活状态时所发布的消息。

持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的unsubscribe方法。

「本地事务」

在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。

事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。

需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。

ActiveMQ存储

ActiveMQ支持很多种存储方式,常见的有 KahaDB存储,AMQ存储,JDBC存储,LevelDB存储,Memory 消息存储。我们重点介绍一下KahaDB和JDBC存储方式。

KahaDB存储

KahaDB是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。

在data/kahadb这个目录下,会生成四个文件,来完成消息持久化 db.data 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息 db.redo 用来进行消息恢复 *db-.log 存储消息内容。

6.png

kahadb文件结构

新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较 快的。默认

是32M,达到阀值会自动递增 lock文件 锁,写入当前获得kahadb读写权限的broker ,用于在集群环境下的竞争处理。

KahaDB有如下几个特性:

日志形式存储消息;

消息索引以 B-Tree 结构存储,可以快速更新;

完全支持 JMS 事务;

支持多种恢复机制kahadb 可以限制每个数据文件的大小。不代表总计数据容量。

配置方式如下:



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3