RocketMQ 快速入门教程,手把手教教你干代码

您所在的位置:网站首页 qq代码发送教学 RocketMQ 快速入门教程,手把手教教你干代码

RocketMQ 快速入门教程,手把手教教你干代码

2023-06-13 06:55| 来源: 网络整理| 查看: 265

目录 RocketMQ定义为什么要用消息中间件?应用解耦流量削峰数据分发 RocketMQ各部分角色介绍NameServer主机(Broker)生产者(Producer)消费者(Consumer)消息(Message) 使用RocketMQ的核心概念主题(Topic)消息队列(Message Queue)分组(Group)标签(Tag)偏移量(Offset) 普通消息三种消息发送方式发送同步消息发送异步消息单向发送消息发送的权衡 两种消息消费方式负载均衡模式(集群消费)广播消费消息消费时的权衡 顺序消息全局有序分区有序 延时消息概念适用场景使用案例生产者消费者 批量消息使用案例一般批量发送(不考虑消息分割)批量切分发送 消息的过滤Tag过滤使用案例注意事项 Sql过滤SQL基本语法注意事项使用案例 分布式事务消息分布式事务的来龙去脉RocketMQ中的处理方案分布式事务使用案例使用限制 Request-Reply消息什么是Request-Reply?与RPC的不同Request-Reply的实现逻辑代码案例

RocketMQ定义

       消息中间件,英文Message Queue,简称MQ。它没有标准定义,一般认为:消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。 高效:对于消息的处理处理速度快,RocketMQ可以达到单机10万+的并发。 可靠:一般消息中间件都会有消息持久化机制和其他的机制确保消息不丢失。 异步:指发送完一个请求,不需要等待返回,随时可以再发送下一个请求,既不需要等待。 一句话总结:消息中间件不生产消息,只是消息的搬运工。 在这里插入图片描述

为什么要用消息中间件? 应用解耦

       系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。        使用消息中间件,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,继续处理存放在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。 在这里插入图片描述

流量削峰

       应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。 互联网公司的大促场景(双十一、店庆活动、秒杀活动)都会使用到MQ。 在这里插入图片描述

数据分发

       通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。        接口调用的弊端,无论是新增系统,还是移除系统,代码改造工作量都很大。 在这里插入图片描述 使用MQ做数据分发好处,无论是新增系统,还是移除系统,代码改造工作量较小。所以使用MQ做数据的分发,可以提高团队开发的效率。 在这里插入图片描述

RocketMQ各部分角色介绍

在这里插入图片描述

NameServer

       NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Rocket中的Broker。        Broker在启动时向所有NameServer注册(主要是服务器地址等),生产者在发送消息之前先从NameServer获取Broker服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。

主机(Broker)

       RocketMQ的核心,用于暂存和传输消息。

生产者(Producer)

       生产者:也称为消息发布者,负责生产并发送消息至RocketMQ。

消费者(Consumer)

        消费者:也称为消息订阅者,负责从RocketMQ接收并消费消息。

消息(Message)

        消息:生产或消费的数据,对于RocketMQ来说,消息就是字节数组。

使用RocketMQ的核心概念 主题(Topic)

       标识RocketMQ中一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。主题主要用于区分消息的种类:一个生产者可以发送消息给一个或者多个Topic,消息的消费者也可以订阅一个或者多个Topic消息。

消息队列(Message Queue)

       简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。        无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。        若一个Topic创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。

分组(Group)

       生产者:标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。主要作用用于事务消息:        消费者:标识一类Consumer的集合名称,这类Consumer通常消费一类消息(也称为Consumer Group),且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。

标签(Tag)

       RocketMQ支持给在发送的时候给消息打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费同一个topic时,如果你消费订阅的时候指定的是tagA,那么tagB的消息将不会投递。

偏移量(Offset)

       RocketMQ中,有很多offset的概念。一般我们只关心暴露到客户端的offset。不指定的话,就是指Message Queue下面的offset。        Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是offset,Message queue中的max offset表示消息的最大offset        Consumer offset可以理解为标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里即消费进度。但从源码上看,这个数值是消费过的最新消费的消息offset+1,即实际上表示的是下次拉取的offset位置。

普通消息

       本章节先会使用RocketMQ提供的原生客户端的API,当然除了原生客户端外,SpringBoot、SpringCloudStream也进行了集成,但本质上这些也是基于原生API的封装,所以只需掌握原生API,其他的也会水到渠成。        Java代码中使用普通消息的整体流程如下

导入MQ客户端依赖 org.apache.rocketmq rocketmq-client 4.8.0 消息发送者步骤 创建消息生产者producer,并指定生产者组名指定Nameserver地址启动producer创建消息对象,指定Topic、Tag和消息体发送消息关闭生产者producer 消息消费者步骤 创建消费者Consumer,指定消费者组名指定Nameserver地址订阅主题Topic和Tag设置回调函数,处理消息启动消费者consumer 三种消息发送方式 发送同步消息

       同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 在这里插入图片描述 代码演示 在这里插入图片描述 发送结果分析 在这里插入图片描述

msgId        消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息。sendStatus         发送的标识:成功,失败等queueId        queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。queueOffset        Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。 发送异步消息

       异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。 在这里插入图片描述 代码演示 在这里插入图片描述 发送结果分析跟发送同步消息相同。

单向发送

       这种方式主要用在不特别关心发送结果的场景,例如日志发送。单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。 在这里插入图片描述 代码演示 在这里插入图片描述

消息发送的权衡

在这里插入图片描述

两种消息消费方式 负载均衡模式(集群消费)

       消费者采用负载均衡方式消费消息,一个分组(Group)下的多个消费者共同消费队列消息,每个消费者处理的消息不同。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。 在这里插入图片描述 代码演示 在这里插入图片描述

广播消费

在这里插入图片描述        广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。 代码演示 在这里插入图片描述

消息消费时的权衡 负载均衡模式:适用场景&注意事项 消费端集群化部署,每条消息只需要被处理一次。由于消费进度在服务端维护,可靠性更高。集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。 广播模式:适用场景&注意事项 每条消息都需要被相同逻辑的多台机器处理。消费进度在客户端维护,出现重复的概率稍大于集群模式。 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。目前仅 Java 客户端支持广播模式。广播消费模式下不支持顺序消息。广播消费模式下不支持重置消费位点。广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。 顺序消息 全局有序

       全局有序比较简单,主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可。

分区有序

       在电商业务场景中,一个订单的流程是:创建、付款、推送、完成。在加入RocketMQ后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到RocketMQ中的一个主题中,这里该如何实现针对一个订单的消息顺序性呢!如下图: 在这里插入图片描述        要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。 代码案例 生产者代码 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 发送日志 在这里插入图片描述

消费者代码        消费时,同一个OrderId获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。 在这里插入图片描述 在这里插入图片描述 注意事项

       使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,同时consume消费消息失败时,不能返回reconsume——later,这样会导致乱序,所以应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。

延时消息 概念

       延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。 在这里插入图片描述

适用场景

       消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时向RocketMQ发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

使用案例

       Apache RocketMQ目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。(RocketMQ的商业版本Aliware MQ提供了任意时刻的定时消息功能,Apache的RocketMQ并没有,阿里并没有开源)        Apache RocketMQ发送延时消息是设置在每一个消息体上的,在创建消息时设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。        延迟消息的level,区分18个等级:level为1,表示延迟1秒后消费;level为2表示延迟5秒后消费;level为3表示延迟10秒后消费;以此类推;最大level为18表示延迟2个小时消费。具体标识如下:

level延迟11S25S310S430S51m62m73m84m95m106m117m128m139m1410m1520m1630m171h182h

是这生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。

生产者

在这里插入图片描述

消费者

在这里插入图片描述查看消费者消息信息,打印消费延迟与生产时设定符合。在这里插入图片描述

批量消息

       在高并发场景中,批量发送消息能显著提高传递消息发送时的性能(减少网络连接及IO的开销)。使用批量消息时的限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),且不能是延时消息。        在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)系列的方法即可。由于批量消息的4MB限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。

使用案例 一般批量发送(不考虑消息分割)

       因为批量消息是一个Collection,所以送入消息可以是List,也可以使Set,这里为方便起见,使用List进行批量组装发送。 在这里插入图片描述

批量切分发送

       如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,案例中以1M大小进行消息分割。        我们需要发送10万元素的数组,这个量很大,怎么快速发送完。使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。 在这里插入图片描述 在这里插入图片描述

消息的过滤

       在实际的开发应用中,对于一类消息尽可能使用一个Topic进行存储,但在消费时需要选择您想要的消息,这时可以使用RocketMQ的消息过滤功能,具体实现是利用消息的Tag和Key。        Key一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ 会创建专门的索引文件,用来存储 Key与消息的映射,由于底层实现是 Hash 索引,应尽量使 Key唯一,避免潜在的哈希冲突。        Tag可以理解为是二级分类。以淘宝交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建OrderTopic 和PayTopic,其中订单消息根据不同的商品品类以不同的 Tag 再进行细分,如手机类、家电类、男装类、女装类、化妆品类,最后它们都被各个不同的系统所接收。通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。        Key和Tag的主要差别是使用场景不同,Key主要用于通过命令行命令查询消息,而Tag用于在消息端的代码中,用来进行服务端消息过滤。 使用Key一般使用mqadmin管理工具,具体位置在RocketMQ/bin目录下。具体文档见:https://github.com/apache/rocketmq/blob/master/docs/cn/operation.md

Tag过滤

        使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。

使用案例

生产者发送60条消息,分别打上三种tag标签。 在这里插入图片描述 消费者消费时只选择TagA和TagB的消息。 在这里插入图片描述

注意事项

       Tag过滤的形式非常简单,||代表或、*代表所有,所以使用Tag过滤这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。

Sql过滤

       SQL特性可以通过发送消息时的属性来进行消息的过滤计算。具体的操作是使用SQL92标准的sql语句,前提是只有使用push模式的消费者才能用(消费的模式就是push)

SQL基本语法

数值比较:比如:>,>=,



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3