如何确定RocketMQ中消费者的线程大小

您所在的位置:网站首页 多线程最大线程数设置多少合适 如何确定RocketMQ中消费者的线程大小

如何确定RocketMQ中消费者的线程大小

2024-07-15 19:59| 来源: 网络整理| 查看: 265

背景

        随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。

        在上一篇文章中,介绍了RocketMQ的工作原理使用。物联网中如何使用RockeMQ

如何在配置文件端配置消费者线程大小

        当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。

        如下代码所示,在消费者类上,会使用@RocketMQMessageListener注解,并填写必要的属性:consumerThreadNumber:消费线程、主题、消费组。

@RocketMQMessageListener( // 指定消费线程大小 consumeThreadNumber = 16, topic = TOPIC_DEMO, consumerGroup = "consumer_demo_group" ) @Component public class Consumer implements RocketMQListener { private final String CHARSET = Charset.defaultCharset().name(); @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); String str = new String(body, Charset.forName(this.CHARSET)); System.out.println("消费者消费的消息为: " + str); } }

        其中的topic、consumerGroup可以指定一次就不会变啦,但是consumerThreadNumber会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml"。

        那应该如何实现呢?

        其中 consumerThreadNumber = 16,表明填入的是一个static的变量,因此如果简单地使用@Value来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static变量,采用了显示set方式赋值变量的方法,最后需要用@Component注解将改配置类添加进IOC容器中。

/** * 注入mq消费的线程数量 */ public static int RocketMQThreadSize; @Value("${biz.RocketMQThreadSize}") public void setRocketMQThreadSize(int threadSize) { RocketMQThreadSize = threadSize; }

        那在配置文件中就可以配置RocketMQThreadSize的大小啦。如下在application.yaml就可以搞定。

biz: RocketMQThreadSize: 200 如何使得自定义的线程大小生效

        如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ提供一个接口可以实现消费者线程的自定义。

        在消费者的类需要实现RocketMQPushConsumerLifecycleListener接口即可,然后在类中实现prepareStart方法即可。具体如下所示:

@RocketMQMessageListener( topic = TOPIC_DEMO, consumerGroup = "consumer_demo_group" ) @Component public class Consumer implements RocketMQListener, RocketMQPushConsumerLifecycleListener { private final String CHARSET = Charset.defaultCharset().name(); @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); String str = new String(body, Charset.forName(this.CHARSET)); System.out.println("消费者消费的消息为: " + str); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { // 指定消费线程大小 defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize); defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize); } }

        在prepareStart方法中,指定一些必要的线程参数

最大线程:defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);最小线程:defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);

并且通过实验和查看源码,其中最大、最小设置一样才会生效。

如何确定合适的线程大小

        以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢?

        线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。

        当线程数量较少时,CPU性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。

        在RocketMQ线程调优有两个指标可以帮助大致确定消费线程大小:

消费者的TPS,表明消费者的能力;机器负载,分为CPU负载和IO负载,和自身的核心数量有关。

        RocketMQ提供web界面,可以监测TPS的大小,这个数量当然是越大越好,但是也要考虑负载。

在这里插入图片描述

        在服务器输入top命令就可以看大,当前机器的负载:

在这里插入图片描述

分别为1、5、15分钟负载。

        在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。

        在测试的时候可以从小到大调节线程数大小,并且关注TPS和负载。

结尾

 以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!! 线程介绍



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3