如何排查RocketMQ消息消费积压问题

您所在的位置:网站首页 rocketmq消费组 如何排查RocketMQ消息消费积压问题

如何排查RocketMQ消息消费积压问题

2023-02-27 14:38| 来源: 网络整理| 查看: 265

RocketMQ的消息消费模型

在RocketMQ消费领域中,判断消费端遇到的瓶颈通常会用到两个重要的指标:Delay和LastConsumeTime。

在开源版本的控制台rocketmq-console界面中,我们可以查阅消费端的这两个指标:

Delay指的是消息积压数量,它是由BrokerOffset(服务端当前最大的逻辑偏移量)减去ConsumerOffset(消费者消费的当前位点)计算出来的。如果Delay值很大,说明消费端遇到了瓶颈。LastConsumeTime表示上一次成功消费消息的存储时间。这个值如果很大,同样能说明消费端遇到了瓶颈。如果这个值线上为1970年,表示消费者当前消费位点对应的消息在服务端已经过期,被删除了。

那为什么消费会积压呢?要理解这个问题,我们首先要了解RocketMQ消费者的消费处理模型。核心流程如下图所示:

说明一下具体的工作流程。

PullMessageService线程从拉取任务队列中获取一个待拉取任务PullRquest。PullMessageService线程根据PullRequest中的主题名称、队列编号、拉取位点向Broker服务器拉取一批消息。拉取到消息后,服务端会更新PullRequest中下一次拉取任务的偏移量,将其放到队列的尾部。PullMessageService线程将拉取到的消息存入到处理队列(ProcessQueue),每一个MessageQueue(Broker名称+主题名称+队列编号)对应一个处理队列。PullMessageService线程将拉取到的消息提交到线程池。PullMessageService线程将消息提交到线程池后,不会等这批消息处理完成,而是立即返回。然后PullMessageService线程重复步骤一到步骤五。当消息提交到消费线程池后,进行异步消费。消息消费成功后,会将消息从处理队列(ProcessQueue)中移除,然后获取处理队列中的最小偏移量,提交消费位点。

从这个过程中可以看出,在RocketMQ的消费处理模型中,PullMessageService线程“马不停歇”地从拉取队列中获取任务,拉完一批消息后继续再将PullRequest(待拉取任务)放入到队列末尾,确保PullMessageService可以不间断地拉取消息,从而实现Push模式的效果。

从理论设计的角度,我们不难看出产生消费积压的原因可能有两个。

第一,Pull线程不拉取消息,那就无法消费消息,没有消费消息,消费位点自然不会提交。第二,消费线程池中的线程因为某种原因阻塞,导致不消费消息,进而同样使得消费位点不提交。

针对第一点,Pull线程的run方法采用的是while(true)+try catch的模式,只要不主动关闭消费者,这个线程是不会停止的。具体的代码实现如下:

这么看来,消费积压基本都是消费线程池由于某种原因阻塞导致的。

在探究阻塞会发生在何处之前,你不妨思考一下,如果消费线程不干活,但拉取线程还一直在从服务端拉取消息,再将消息提交到消费线程池和ProcessQueue,这时会出现什么问题?

没错,内存溢出。所以,为了保护消费者进程,这个时候我们必须引入限流机制限制拉取线程的行为。

在RocketMQ中,我们主要通过三点来判断是否需要进行限流:

消息消费端队列中积压的消息超过 1000 条;消息处理队列中积压的消息尽管没有超过 1000 条,但最大偏移量和最小偏移量的差值超过2000;消息处理队列中积压的消息总大小超过100M。

RocketMQ一旦触发限流,往往会在${user_home}/logs/rocketmqlogs/rocketmq_client.log文件中打印对应的日志,如果日志中包含了关键字“so do flow control”,表明消费端存在性能瓶颈,这就是我们的突破方向。

如何排查RocketMQ消息消费积压问题?

那如何定位消费端慢在哪,又是卡在了哪行代码呢?

我们常用的排查方法是跟踪线程栈,利用jstack命令查看线程运行情况,以此探究线程的运行情况。通常可以使用下面的命令:

ps -ef | grep java jstack pid > j1.log

为了方便对比,我一般会连续打印五个文件,这样可以在五个文件中查看同一个消费者线程的状态,看它是否发生了变化。如果始终没有变化,说明该消费线程长时间阻塞,这就需要我们重点关注了。

在RocketMQ中,消费端线程以ConsumeMessageThread_打头,通过对线程的判断,可以发现下面这段代码:

这些线程的状态为RUNNABLE,并且在jstack日志中状态一直没有发生变化,说明这些线程是有问题的。通过线程栈,我们可以清楚地定位到具体的代码行。

在这个示例中,通过对线程栈的分析,我们发现是调用HTTP请求时没有设置超时时间,这就导致线程一直阻塞,对应的消息始终没有处理完成。消息一直在处理队列(ProcessQueue)中,而RocketMQ采取的又是最小位点提交机制,消费位点无法继续向前推进,这才出现了消费积压。

至此,消费积压问题的根本原因就定位出来了。

最后,我还想跟你分享几个小经验。

结合我的生产实践,通常情况下,RocketMQ消息发送问题很可能与服务端有直接关系,而RocketMQ消费端遇到的一些性能问题通常与消费进程自身有关系。

另外,消费积压的时候,可以简单关注一下这个集群其他消费者的情况。如果其他消费者没有积压,只有你负责的消费组有积压,那就一定是消费端代码的问题了。

在这里最后再强调一遍,查看线程栈并不只是去查看线程状态为BLOCKED、TIME_WRATING的线程,RUNNABLE的线程状态同样需要查看。因为在一些网络操作中(例如,HTTP请求等待返回结果时、MySQL写入/查询等待获取执行结果时),线程的状态也是RUNNABLE。

此文章为2月Day16学习笔记,内容来源于:极客时间《中间件核心技术与实战》,强烈推荐此课程。

另外,最近重温操作系统时发现了一个免费精品好课,闪客的《Linux0.11源码趣读》,这个课给我感觉像在用看小说的心态学操作系统源码,写的确实挺牛的,通俗易懂,直指本源,我自己也跟着收获了很多。

这个课在极客时间上是免费的,口碑很不错,看评论下很多人在催更和重温,强烈推荐!

戳此链接领取:https://time.geekbang.org/opencourse/intro/100310101?utm_source=linux_dk&utm_term=linux_dk

或通过下面海报领取



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3