如何确定RocketMQ中消费者的线程大小 |
您所在的位置:网站首页 › 多线程最大线程数设置多少合适 › 如何确定RocketMQ中消费者的线程大小 |
背景
随着物联网行业的发展、智能设备数量越来越多,随着设备活跃量过大,常常存在一些高并发的请求,形成了流量尖峰,过多的请求会压垮服务器,影响其他服务运行。因此,为了保护云端服务,需要对请求进行缓冲,RocketMQ就是一款非常优秀消息队列的中间件,在互联网领域久经考验,也被各个行业广泛应用。 在上一篇文章中,介绍了RocketMQ的工作原理使用。物联网中如何使用RockeMQ 如何在配置文件端配置消费者线程大小当生产者将大量的消息堆积到消息队列中时,需要同步启用消费者去消费队列中的消息,达到动态平衡的效果。 如下代码所示,在消费者类上,会使用@RocketMQMessageListener注解,并填写必要的属性:consumerThreadNumber:消费线程、主题、消费组。 @RocketMQMessageListener( // 指定消费线程大小 consumeThreadNumber = 16, topic = TOPIC_DEMO, consumerGroup = "consumer_demo_group" ) @Component public class Consumer implements RocketMQListener { private final String CHARSET = Charset.defaultCharset().name(); @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); String str = new String(body, Charset.forName(this.CHARSET)); System.out.println("消费者消费的消息为: " + str); } }其中的topic、consumerGroup可以指定一次就不会变啦,但是consumerThreadNumber会根据机器的性能发生变化;因此需要将其提出放到配置文件中,方便修改,比如"application.yaml"。 那应该如何实现呢? 其中 consumerThreadNumber = 16,表明填入的是一个static的变量,因此如果简单地使用@Value来进行注入变量是不成功的,因为它只能注入非静态变量。为了能实现从配置文件中读取变量,并转为static变量,采用了显示set方式赋值变量的方法,最后需要用@Component注解将改配置类添加进IOC容器中。 /** * 注入mq消费的线程数量 */ public static int RocketMQThreadSize; @Value("${biz.RocketMQThreadSize}") public void setRocketMQThreadSize(int threadSize) { RocketMQThreadSize = threadSize; }那在配置文件中就可以配置RocketMQThreadSize的大小啦。如下在application.yaml就可以搞定。 biz: RocketMQThreadSize: 200 如何使得自定义的线程大小生效如上一章所示,可以得到静态的变量,那如何在消费者中生效呢?幸好RocketMQ提供一个接口可以实现消费者线程的自定义。 在消费者的类需要实现RocketMQPushConsumerLifecycleListener接口即可,然后在类中实现prepareStart方法即可。具体如下所示: @RocketMQMessageListener( topic = TOPIC_DEMO, consumerGroup = "consumer_demo_group" ) @Component public class Consumer implements RocketMQListener, RocketMQPushConsumerLifecycleListener { private final String CHARSET = Charset.defaultCharset().name(); @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); String str = new String(body, Charset.forName(this.CHARSET)); System.out.println("消费者消费的消息为: " + str); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { // 指定消费线程大小 defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize); defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize); } }在prepareStart方法中,指定一些必要的线程参数 最大线程:defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);最小线程:defaultMQPushConsumer.setConsumeThreadMin(ConfigProperties.RocketMQThreadSize);并且通过实验和查看源码,其中最大、最小设置一样才会生效。 如何确定合适的线程大小以上的步骤已经帮忙把线程设置提取出来啦,之后只需在配置文件中修改线程数大小,而不需去代码中修改,避免代码导致系统出现问题。那如何去确定线程的数量大小呢? 线程是计算机执行任务的基本的单位,即消费任务可以交给线程去执行。 当线程数量较少时,CPU性能不能充分发挥。但是线程数量过的就会导致过多的线程处于等待中,机器的负载升高。因此需要确定适合当前机器的线程数量。 在RocketMQ线程调优有两个指标可以帮助大致确定消费线程大小: 消费者的TPS,表明消费者的能力;机器负载,分为CPU负载和IO负载,和自身的核心数量有关。RocketMQ提供web界面,可以监测TPS的大小,这个数量当然是越大越好,但是也要考虑负载。 在服务器输入top命令就可以看大,当前机器的负载: 分别为1、5、15分钟负载。 在进行压测的时候,需要知道机器的核心数量,监测负载的时候负载的大小就不能超过核心数量。 在测试的时候可以从小到大调节线程数大小,并且关注TPS和负载。 结尾以上就是确定消费者线程大小的整个过程,有疑问欢迎留言交流!!! 线程介绍 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |