Redis与RabbitMQ配合使用多线程(多消费者)处理消息

您所在的位置:网站首页 rabbitmq多线程消费 Redis与RabbitMQ配合使用多线程(多消费者)处理消息

Redis与RabbitMQ配合使用多线程(多消费者)处理消息

2024-07-10 06:50| 来源: 网络整理| 查看: 265

引言

并发引起的服务器崩溃是非常常见的现象,为了解决这一问题,目前流行使用缓存数据库与消息队列搭配使用。在最近的项目中也是使用到这一手段,本篇文章通过一个案例为大家展示该套方案如何使用。

案例描述与流程

本案例是一个经典的并发下单的案例。在Redis中存在一条key为Apple,Value为10000的数据,为防止超卖问题的发生使用Redisson分布式锁避免超卖(在Redis解决超卖Demo这篇文章中已经讲过),在一个线程拿到锁并且符合下单条件则直接返回下单成功同时发送消息,使用AMQP监听队列消息,通过线程池创建多个线程作为消费者进行底层DB的更新。

环境准备 创建模块名为Redis

yml配置文件的编写 server: port: 9000 spring: application: name: redis datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/user?useSSL=false username: root password: 123456 redis: host: 192.168.136.130 port: 6379 password: 123456 lettuce: pool: max-active: 10 max-idle: 10 min-idle: 1 time-between-eviction-runs: 10s rabbitmq: host: 192.168.136.130 #MQ地址 port: 5672 #端口 virtual-host: / #虚拟主机 username: demo #用户密码 password: 123321 connection-timeout: 1s template: retry: #重试机制 enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 publisher-confirm-type: correlated publisher-returns: true Controller Test类的编写

 设置路径为  /testAddsetxAddFinally

@RestController @RequestMapping("/") public class Test { @Autowired private RedisTemplate redisTemplate; @Autowired private RabbitTemplate rabbitTemplate; /*使用setnx锁,同时给锁释放过期时间,自动释放锁 * */ @RequestMapping("testAddsetxAddFinally") String cherkAndReduceStockAddSetnxAddFinally() { Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock-stock", "0000",2, TimeUnit.SECONDS); //获取锁失败,停止50ms,递归调用 if (!lock){ try { Thread.sleep(3000); this.cherkAndReduceStockAddSetnxAddFinally(); } catch (InterruptedException e) { e.printStackTrace(); } }else { try { String stock = redisTemplate.opsForValue().get("Apple").toString(); if(stock!=null&&stock.length()!=0) { Integer valueOf = Integer.valueOf(stock); if (valueOf>0) { redisTemplate.opsForValue().set("Apple",String.valueOf(--valueOf)); //推送MQ String queue="demo.queue"; //123456为用户id 1为商品id String masg="123456:1"; rabbitTemplate.convertAndSend(queue,masg); return "抢购成功!"; }else { System.out.println("商品售罄!!!"); return "商品售罄!!!"; } } }finally { redisTemplate.delete("lock-stock"); } } return ""; } }  编写RedisConfig类序列化存储 @Configuration public class RedisConfig { @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) { //缓存序列化配置避免存储乱码 RedisTemplate redisTemplate=new RedisTemplate(); redisTemplate.setConnectionFactory(factory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); return redisTemplate; } }

 

创建Consumer模块

 yml文件 server: port: 9004 spring: application: name: redis datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/user?useSSL=false username: root password: 123456 redis: host: 192.168.136.130 port: 6379 password: 123456 lettuce: pool: max-active: 10 max-idle: 10 min-idle: 1 time-between-eviction-runs: 10s rabbitmq: host: 192.168.136.130 port: 5672 virtual-host: / username: demo password: 123321 connection-timeout: 1s template: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 publisher-confirm-type: correlated publisher-returns: true 编写order实体类 package cn.itcast.mq.pojo; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import lombok.Data; import org.springframework.data.relational.core.mapping.Table; @Data @TableName("orderlist") public class order { //用户id @TableField("userId") private String userId; //商品id private String id; public order(String userId, String id) { this.userId = userId; this.id = id; } }

注意对应关系

 编写orderMapper package cn.itcast.mq.mapper; import cn.itcast.mq.pojo.order; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; @Mapper public interface orderMapper extends BaseMapper { } 编写orderService package cn.itcast.mq.service; import cn.itcast.mq.pojo.order; import com.baomidou.mybatisplus.extension.service.IService; public interface orderService extends IService { } 编写Iml实现类 package cn.itcast.mq.service; import cn.itcast.mq.mapper.orderMapper; import cn.itcast.mq.pojo.order; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.stereotype.Service; @Service public class orderServiceImpl extends ServiceImpl implements orderService{ } 构建Listerner线程池,构建容器工厂

使用@RabbitListener注解指定消费方法,默认情况是单线程监听队列,可以观察当队列有多个任务时消费端每次只消费一个消息,单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,可以配置mq的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

package cn.itcast.mq.thread; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; @Configuration @EnableAsync public class ThreadPoolConfig { @Bean("customContainerFactory") public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConcurrentConsumers(10); //设置线程数 factory.setMaxConcurrentConsumers(10); //最大线程数 configurer.configure(factory, connectionFactory); return factory; } } 编写MQListener监听队列 package cn.itcast.mq.listeners; import cn.itcast.mq.pojo.order; import cn.itcast.mq.service.orderService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Component @Slf4j public class MqListener { @Autowired private orderService orderService; //声明队列 mq的容器工厂 @RabbitListener(queues="demo.queue",containerFactory = "customContainerFactory") public void listenSimpleQueue(String msg) { //拆分消息 String[] split = msg.split(":"); order order = new order(split[0], split[1]); System.out.println(order.toString()); //保存MYSQL orderService.save(order); //测试是否多个消费者 System.out.println("线程" + Thread.currentThread().getName() + " 执行异步任务" ); } } RabbitMQ的准备 创建demo.queue队列

 创建demo用户并且配置虚拟主机

 进行测试 启动Redis和Consumer服务

 使用JMeter压测12000个用户

 开始压测

查看队列

观察Consumer控制台,一万条消息瞬间执行完成!

 查看MySQL orderlist表,有一万条数据

 查看Redis 数据库并没有出现超卖问题,案例成功!!

 附加

解决RabbitMQ消息堆积的方案有三种

增加更多消费者,提高消息速度。(本案例采用这一种)在消费者中开启线程池加快消息处理速度。扩大队列容积,提高堆积上限,采用惰性队列。  总结

通过本次演示的案例,希望大家可以掌握并且多加练习,在日常的开发中缓存数据库和异步队列是必备的手段,同时也是大家找工作时的一个亮点。本文如有不妥之处希望大家指正!!!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3