Redis高并发缓存架构与缓存击穿、穿透、雪崩、双写不一致优化

您所在的位置:网站首页 redis击穿和穿透的解决方法 Redis高并发缓存架构与缓存击穿、穿透、雪崩、双写不一致优化

Redis高并发缓存架构与缓存击穿、穿透、雪崩、双写不一致优化

#Redis高并发缓存架构与缓存击穿、穿透、雪崩、双写不一致优化 | 来源: 网络整理| 查看: 265

中小公司常见的缓存架构

我们在一些中小的公司经常会遇到以下的这种做法,如:查询到数据后 就缓存到redis。更新(添加)数据后 缓存到redis。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 private final SysUserService userService; @Autowired private StringRedisTemplate stringRedisTemplate; private final static String USER_INFO = "user:info:";/** * 通过ID查询用户信息 * @param id ID * @return 用户信息 */ @GetMapping("/{id}") public R user(@PathVariable Long id) { UserVO data = null; // 判断redis中是否有该用户数据 String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id); if(EmptyUtils.isNotEmpty(userResult)){ data = JSON.parseObject(userResult, UserVO.class); return R.ok(data); } data = userService.selectUserVoById(id); if(EmptyUtils.isNotEmpty(data)){ // 不为空,代表查询到了用户数据,我们就进行缓存 stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(),JSON.toJSONString(data)); } return R.ok(data); } /** * 添加用户 * @param userDto 用户信息 * @return success/false */ @SysLog("添加用户") @PostMapping @PreAuthorize("@pms.hasPermission('sys_user_add')") public R user(@RequestBody UserDTO userDto) { if(userService.saveUser(userDto)){ stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto)); } return R.ok(); } /** * 更新用户信息 * @param userDto 用户信息 * @return R */ @SysLog("更新用户信息") @PutMapping @PreAuthorize("@pms.hasPermission('sys_user_edit')") public R updateUser(@Valid @RequestBody UserDTO userDto) { if(userService.updateUser(userDto)){ stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUpdateBy(), JSON.toJSONString(userDto)); } return R.ok(userDto); }

但是这种做法,如果是对于京东电商、淘宝等且流量巨大时,明显是不合理的。

因为电商的商品、sku都是非常多的,如京东商品都是以亿为单位的,如果每个商品、sku都缓存的话,需要大量的redis服务器。

优化方案:冷热分离

缓存数据冷热分离 以京东商城为例,90%的商品都不会被经常访问,只有10%的商品才是热门、主推、经常被访问的商品,对于一些经常请求的商品、sku或其他数据,我们可以缓存到redis中。 对于一些不经常访问的商品(数据),我们可以不缓存,节省redis服务器的内存空间,这就是冷热分离的基本概念。 当然该模式,需要根据实际的场景和项目需求来分析,适用于一些高并发或大数据量的缓存。 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 private final static String USER_INFO = "user:info:"; private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;/** * 通过ID查询用户信息 * @param id ID * @return 用户信息 */ @GetMapping("/{id}") public R user(@PathVariable Long id) { UserVO data = null; // 判断redis中是否有该用户数据 String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id); if(EmptyUtils.isNotEmpty(userResult)){ data = JSON.parseObject(userResult, UserVO.class); // 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多 stringRedisTemplate.expire(USER_INFO+id,USER_CACHE_TIMEOUT, TimeUnit.SECONDS); return R.ok(data); } data = userService.selectUserVoById(id); if(EmptyUtils.isNotEmpty(data)){ // 不为空,代表查询到了用户数据,我们就进行缓存 stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(), JSON.toJSONString(data),USER_CACHE_TIMEOUT, TimeUnit.SECONDS); } return R.ok(data); } /** * 添加用户 * @param userDto 用户信息 * @return success/false */ @SysLog("添加用户") @PostMapping @PreAuthorize("@pms.hasPermission('sys_user_add')") public R user(@RequestBody UserDTO userDto) { if(userService.saveUser(userDto)){ stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto),USER_CACHE_TIMEOUT, TimeUnit.SECONDS); } return R.ok(); } /** * 更新用户信息 * @param userDto 用户信息 * @return R */ @SysLog("更新用户信息") @PutMapping @PreAuthorize("@pms.hasPermission('sys_user_edit')") public R updateUser(@Valid @RequestBody UserDTO userDto) { if(userService.updateUser(userDto)){ stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto),USER_CACHE_TIMEOUT, TimeUnit.SECONDS); } return R.ok(userDto); }

优化地方:

添加和更新数据时候,会存入到redis缓存中并设置超时时间,超时时间为一天(可自行调整) 查询数据时,如果redis中存在缓存,说明该数据经常被访问,我们就在该key的超时时间上 延长超时时间 超时时间是一天,如果还存在缓存说明经常被访问,因为不经常访问的都已经过期被redis删除了 缓存击穿(缓存失效) 由于redis大批量缓存在同时失效,导致大量请求同时击穿到了数据库,导致数据库瞬间压力过大甚至宕机。 解决方案 给缓存设置过期时间时,时间增加随机值,这样的话 每个缓存过期时间都不一样,就不会出现大批量缓存同时失效。

举2个例子:

1、当后台管理员批量上传1000个商品时,如果我们没有增加随机值,那么1000个商品的缓存过期时间都是一样的,就很可能存在缓存击穿的风险,因为1000个商品的缓存会同时都失效。那么如果大量的客户来访问时,请求就都会打到数据库。

2、如秒杀,后台管理人员上架秒杀商品时,是根据场次 如 14:00 、15:00 上架的,批量上架1000个商品,上架时 都会把热点商品提前存入缓存中,并设置缓存过期时间,就会存在缓存击穿风险,因为1000个商品的缓存会同时都失效。那么如果大量的客户来访问时,请求就都会打到数据库。

优化方案

结合上面的代码,我们来优化一下,给缓存的过期时间增加一个随机值,让每个key的过期时间都不一样,依次达到解决缓存击穿问题。

12345678910111213141516171819 /** * 更新用户信息 * @param userDto 用户信息 * @return R */ @SysLog("更新用户信息") @PutMapping @PreAuthorize("@pms.hasPermission('sys_user_edit')") public R updateUser(@Valid @RequestBody UserDTO userDto) { if(userService.updateUser(userDto)){ stringRedisTemplate.opsForValue().set(USER_INFO+userDto.getUserId(), JSON.toJSONString(userDto),createRandomExpTime(), TimeUnit.SECONDS); } return R.ok(userDto); }// 给超时时间增加 随机时间 public Integer createRandomExpTime(){ return USER_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60; } 缓存穿透 查询一个不存在的数据,导致redis缓存和数据库都不会命中。 如果此时高并发访问(或黑客CC攻击),大量请求会直接穿透到数据库,导致数据库处理卡顿甚至宕机。

缓存击穿

如图所示:此时客户端查询一条记录,我们在nginx中本地缓存查询不到,则会在jvm 内存缓存查询,如果内存缓存查询不到就会去redis中查询,redis还查不到,就会直接 去数据库中查询,如果数据库都查不到 这就叫 缓存穿透,查询一个不存在的数据,导致缓存和数据库都不会命中。

缓存穿透的风险

以京东为例,如果黑客想要通过缓存穿透进行压力攻击的话,只需要找到不存在的数据即可。

如:https://item.jd.com/100024707737.html ,这是一个商品链接,假设我们在100024707737 后面随机增加几个值,然后通过DDOS 或 压测软件疯狂的去访问,大量的请求会去查询一个不存在的商品,redis肯定查询不到数据,所以就会去查询数据库,直接穿透到数据库,数据库的瞬间压力过大,很可能会宕机。

以京东这个商品网站为例,因为京东这个是生成了静态页面,所以如果我们输入不存在的页面,在nginx肯定就已经被重定向了。当然上面这个只是举一个例子,核心本质 就是 查询一些不存在的数据,导致缓存层(redis)查询不到后 继而去查询存储层(数据库),这就是缓存穿透。

希望大家能举一反三,反复斟酌,解决在生产环境中可能遇到的缓存穿透问题。

造成缓存穿透的基本原因 自身业务出现BUG或数据出问题(根据墨菲定律,脏数据一定会出问题) 恶意攻击或爬虫造成大量空命中 穿透解决方案

解决缓存穿透问题主要有2个方案:

缓存空对象 布隆过滤器 缓存空对象

为了安全考虑,如果第一次从数据库中查询不到,就会在redis中创建一个空值,并设置过期时间(1分钟,具体自己调整)。

这样的话,下次在来请求该数据,会直接从缓存直接返回,避免再次请求数据库。

1234567891011121314151617181920212223242526272829303132333435363738private final static String USER_INFO = "user:info:";private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;private final static String EMPTY_CACHE = "{}";/** * 通过ID查询用户信息 * @param id ID * @return 用户信息 */@GetMapping("/{id}")public R user(@PathVariable Long id) { UserVO data = null; // 判断redis中是否有该用户数据 String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id); if(EmptyUtils.isNotEmpty(userResult)){ // 判断是否为空值 if(EMPTY_CACHE.equals(userResult)){ // 如果为空,说明该用户经常被请求,则延长缓存60秒,然后返回空。 stringRedisTemplate.expire(USER_INFO+id,60, TimeUnit.SECONDS); return null; } data = JSON.parseObject(userResult, UserVO.class); // 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多 stringRedisTemplate.expire(USER_INFO+id,createRandomExpTime(), TimeUnit.SECONDS); return R.ok(data); } data = userService.selectUserVoById(id); if(EmptyUtils.isNotEmpty(data)){ // 不为空,代表查询到了用户数据,我们就进行缓存 stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(), JSON.toJSONString(data),createRandomExpTime(), TimeUnit.SECONDS); }else{ // 查询不到数据就存储一个空json字符串 stringRedisTemplate.opsForValue().set(USER_INFO+data.getUserId(), EMPTY_CACHE,60, TimeUnit.SECONDS); } return R.ok(data);}

优化细节:

如果数据库查询不到数据,就创建一个空的json字符串,过期时间为60秒(别搞太长) 如果不设置过期时间的话或时间设置太长,黑客并发攻击时,redis内存空间会存在大量的空字符串,占用宝贵的内存空间 在redis中如果拿到了数据,判断数据是否是空的json字符串{},如果是的话就延长该key的过期时间并返回null 布隆过滤器

对于恶意攻击导致的缓存穿透,除了缓存空对象,还可以引入 布隆过滤器,布隆过滤器会先对请求进行一次过滤,对于不存在的数据会直接返回错误,不会往后面执行。

布隆过滤器根据内部hash算法可以判断出 查询的key是否存在,他认为存在就可能存在,认为不存在就肯定不存在。

布隆过滤器

布隆过滤器就是一个大型的位数组和几个不一样的无偏 hash 函数。所谓无偏就是能够把元素的 hash 值算得比较均匀。

添加key

当我们向布隆过滤中添加 key时,会根据key 使用多个hash函数进行哈希运算,得出一个整数索引值,然后根据数组的总长度对索引值进行取模运算,得出数组存放位置,每个hash函数的位置都不同,在把每个存放的位置都置为1 。

查询key

当我们向布隆过滤器中查询key是否存在时,会跟添加时候一样计算出位置,并且确认 所有位置是否都为1 ,如果有一个为0,key就不存在,如果都为1 也并不一定代表存在,只是极有可能,但是我们认为他存在 给他放行。

布隆过滤器适合对 数据命中率要求不高,且较为固定、实时性低的应用场景,缓存空间占用很少,但是维护比较麻烦。

注意:布隆过滤器不能删除数据,如果要删除得重新初始化数据。

可以用redisson实现布隆过滤器,引入依赖:

12345 org.redisson redisson 3.6.5

示例伪代码:

123456789101112131415161718192021222324252627package com.redisson;import org.redisson.Redisson;import org.redisson.api.RBloomFilter;import org.redisson.api.RedissonClient;import org.redisson.config.Config;public class RedissonBloomFilter { public static void main(String[] args) { Config config = new Config(); config.useSingleServer().setAddress("redis://localhost:6379"); //构造Redisson RedissonClient redisson = Redisson.create(config); RBloomFilter bloomFilter = redisson.getBloomFilter("nameList"); //初始化布隆过滤器:预计元素为100000000L,误差率为3%,根据这两个参数会计算出底层的bit数组大小 bloomFilter.tryInit(100000000L,0.03); //将chenguanxi插入到布隆过滤器中 bloomFilter.add("chenguanxi"); //判断下面号码是否在布隆过滤器中 System.out.println(bloomFilter.contains("zhangbaizhi"));//false System.out.println(bloomFilter.contains("wangbaoqiang"));//false System.out.println(bloomFilter.contains("chenguanxi"));//true }}

使用布隆过滤器需要把所有数据提前放入布隆过滤器,并且在增加数据时也要往布隆过滤器里放,布隆过滤器缓存过滤伪代码:

1234567891011121314151617181920212223242526272829303132333435//初始化布隆过滤器RBloomFilter bloomFilter = redisson.getBloomFilter("nameList");//初始化布隆过滤器:预计元素为100000000L,误差率为3%bloomFilter.tryInit(100000000L,0.03); //把所有数据存入布隆过滤器void init(){    for (String key: keys) {        bloomFilter.put(key);    }}String get(String key) {    // 从布隆过滤器这一级缓存判断下key是否存在    Boolean exist = bloomFilter.contains(key);    if(!exist){        return "";    }    // 从缓存中获取数据    String cacheValue = cache.get(key);    // 缓存为空    if (StringUtils.isBlank(cacheValue)) {        // 从存储中获取        String storageValue = storage.get(key);        cache.set(key, storageValue);        // 如果存储数据为空, 需要设置一个过期时间(300秒)        if (storageValue == null) {            cache.expire(key, 60 * 5);        }        return storageValue;    } else {        // 缓存非空        return cacheValue;    }} 缓存雪崩 缓存雪崩指的是缓存层(redis)宕机了,请求会直接打到存储层(数据库),之前有缓存层可以有效的保护存储层请求不会直接请求到存储层,但是如果由于某种原因导致 缓存层挂了,如:高并发请求,那么存储层也很可能跟着一起挂掉,而请求数据的服务器随着线程池越来越多,也会内存溢出,如雪崩一样 所有环上的节点都出现问题

由于一般单机redis节点的QPS只有十万+,如果突然出现了爆发性的并发请求。

很可能会直接把redis节点打挂掉,如果redis节点挂了,请求就会打到数据库来,数据库也跟着挂掉,那么所有服务器和中间件都会和雪崩一样,全部瘫痪,所有的请求都会直接超时 即便redis节点没有挂掉,也会处于一个 hang的状态(暂停服务,挂起) 由于客户端的请求都超时了,前面的客户端(服务器)的请求就会阻塞在线程池里面,随着请求越来越多,客户端的线程池满了,内存也会跟着满,就会出现 OOM 内存泄漏溢出的情况。

如:微博大V 出了花边新闻,他的粉丝会一拥而入,可能会有几十万甚至几百万爆发性增长的访问量,redis集群很可能会撑不住,一旦redis撑不住,请求会直接打到数据库来,数据库很可能也会跟着挂掉。

一般情况都是 秒杀、或者 特大热点数据,才会出现这种 缓存雪崩的情况。

解决方案

解决缓存雪崩的方式有很多,有以下几点:

保证缓存层的高可用,如Redis Sentinel或Redis Cluster 使用 后端限流熔断并降级的组件,如Sentinel或Hystrix限流降级组件 提前做高并发测试,为避免以后突发情况,需要提前演练 高并发的场景,做一些基础的预案 双缓存架构 双缓存架构

我们可以在redis缓存之前,在增加一层 jvm的缓存,redis缓存单机的QPS在十万左右,而jvm的缓存qps则最少是百万级的,但是jvm的缓存有一个缺点:内存有限,存储的数据有限。

因为内存有限,我们需要对缓存进行一定的管理,定时删除一些 不是很热点的数据,只保存最热门的数据(访问的人并发自然高),为此我们可以通过 jvm的缓存框架来实现,如:Guava Cache

双缓存架构(热门预热的)

在一些大厂里面,如 微博,都是有专门做 缓存预热的系统,如 特别热门的数据,会提前的存储在jvm缓存中。

如图所示,java服务端会订阅预热系统的通道,当预热系统检测到有可能存在高并发大流量的热门数据,就会 发送通知给java服务端,java服务端就会把数据 通过jvm缓存框架 存储在缓存中。

jvm的QPS是非常强的,纯粹的访问内存缓存,效率比redis高了好几倍,随着服务器的集群部署,能抗的并发也是非常高的。

当然,jvm内存缓存只会存储一些可能产生 高并发的热门数据,对于一些不是很热门或冷门数据,如果内存缓存中不存在,还是会从redis乃至数据库中去查询。

JVM缓存代码示意图

判断jvm内存缓存是否存在数据,如果查询到内存缓存中有数据就直接返回,如果不存在则 代表是不热门或冷门数据,走redis或数据库查询。 冷门数据突发性热点重建缓存 当一个冷门的数据(比如:商品,没有建立缓存),突然变成热门数据,被大量的并发访问,进行重建缓存,数据库瞬间压力压力暴增。

案例1:

冷门数据突发性情况

秒杀场景是非常常见的,如 抖音直播时,主播大V推销一款冷门商品,几万人乃至几十万人同时访问该商品,几十万的并发突然同一时间请求接口,由于冷门商品之前没有建立redis缓存,这些请求都会打到数据库去,数据库如果扛不住会直接宕机。

单机架构解决方案 在单机架构下,我们可以通过 同步锁 + 双重检测 来解决。 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758private final SysUserService userService;@Autowiredprivate StringRedisTemplate stringRedisTemplate;private final static String USER_INFO = "user:info:";private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;private final static String EMPTY_CACHE = "{}";/** * 通过ID查询用户信息 * * @param id ID * @return 用户信息 */@GetMapping("/{id}")public R user(@PathVariable Long id) { UserVO data = null; // 判断redis中是否有该用户数据 UserVO userResult = getUserHotCacheInfo(id); if(userResult != null){ return R.ok(userResult); } synchronized (this) { userResult = getUserHotCacheInfo(id); if(userResult != null){ return R.ok(userResult); } data = userService.selectUserVoById(id); if (EmptyUtils.isNotEmpty(data)) { // 不为空,代表查询到了用户数据,我们就进行缓存 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS); } else { // 查询不到数据就存储一个空json字符串 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS); } } return R.ok(data);}public UserVO getUserHotCacheInfo(Long id) { UserVO data = null; String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id); if (EmptyUtils.isNotEmpty(userResult)) { // 判断是否为空值 if (EMPTY_CACHE.equals(userResult)) { // 如果为空,说明该用户经常被请求,则延长缓存60秒,然后返回空。 stringRedisTemplate.expire(USER_INFO + id, 60, TimeUnit.SECONDS); return new UserVO(); } data = JSON.parseObject(userResult, UserVO.class); // 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多 stringRedisTemplate.expire(USER_INFO + id, createRandomExpTime(), TimeUnit.SECONDS); } return data;}

缺点:

一旦使用了同步锁锁住了代码,此时访问代码块的所有线程都会串行执行,即便访问的冷门数据不是同一个,也要 乖乖的排队。 举例:2个大V在直播,推荐的两个冷门商品都不同,如果使用了同步锁,那么在大量请求访问这两个商品时,请求进入接口后都得变成串行化执行,非常影响效率,性能也得不到体现。 如:此时有两个商品:商品 101、商品102 ,商品101 进入接口后进入同步锁,此时102 也进入了接口后会阻塞线程,等待商品101的线程锁释放。 最终解决方案 通过 redisson的分布式锁来解决 冷门数据突发性热点重建缓存的问题。 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364private final static String USER_INFO = "user:info:";private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24;private final static String EMPTY_CACHE = "{}";private final static String LOCK_USER_HOT_CACHE_CREATE_PREFIX = "lock:user:hot_cache_create:";@Autowiredprivate Redisson redisson;/** * 通过ID查询用户信息 * * @param id ID * @return 用户信息 */@GetMapping("/{id}")public R user(@PathVariable Long id) { UserVO data = null; // 判断redis中是否有该用户数据 UserVO userResult = getUserHotCacheInfo(id); if (userResult != null) { return R.ok(userResult); } // 创建 redisson分布式锁 RLock hotCacheCreateLock = redisson.getLock(LOCK_USER_HOT_CACHE_CREATE_PREFIX + id); // 加锁 hotCacheCreateLock.lock(); try { userResult = getUserHotCacheInfo(id); if (userResult != null) { return R.ok(userResult); } data = userService.selectUserVoById(id); if (EmptyUtils.isNotEmpty(data)) { // 不为空,代表查询到了用户数据,我们就进行缓存 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS); } else { // 查询不到数据就存储一个空json字符串 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS); } }finally { // 释放锁 hotCacheCreateLock.unlock(); } return R.ok(data);}public UserVO getUserHotCacheInfo(Long id) { UserVO data = null; String userResult = stringRedisTemplate.opsForValue().get(USER_INFO + id); if (EmptyUtils.isNotEmpty(userResult)) { // 判断是否为空值 if (EMPTY_CACHE.equals(userResult)) { // 如果为空,说明该用户经常被请求,则延长缓存60秒,然后返回空。 stringRedisTemplate.expire(USER_INFO + id, 60, TimeUnit.SECONDS); return new UserVO(); } data = JSON.parseObject(userResult, UserVO.class); // 如果有数据,则延长超时时间,因为说明该数据有人经常访问,修改TTL 速度是非常快的,比set get都快很多 stringRedisTemplate.expire(USER_INFO + id, createRandomExpTime(), TimeUnit.SECONDS); } return data;}

代码执行流程

image-2image-3

执行流程:

判断redis中是否有该用户数据(getUserHotCacheInfo) getUserHotCacheInfo方法 如果没有数据,则返回null,如果redis缓存保存的是 空的json字符串{},这是之前防止缓存穿透的优化代码,我们就返回空的实体类,空的实体类和null还是有区别的。 返回的userResult,有三种可能性 空的实体类,UserVO data = new UserVO() redis中有该用户的数据,直接返回 redis中没有该用户的数据,继续往下执行去数据库中查询数据 创建分布式锁,加锁,抢到锁的线程继续往下执行,没抢到锁的线程阻塞等待,然后while自循环尝试加锁 抢到锁的线程,在分布式锁里面,再次执行getUserHotCacheInfo方法,进行二次检查 返回的userResult,有三种可能性,和上面一样,如果userResult返回null,则继续往下执行 根据用户id查询数据库 如果数据库中查询到数据,就加入redis缓存并返回给客户 如果数据库中查询不到数据,就创建一个空的 json字符串,防止缓存穿透,设置过期时间 60秒 因为我们加了 try catch finally,最终结果不论如何,都会执行 释放锁的命令 当持有锁的线程释放锁后,唤醒等待的线程,线程再次去争夺锁,抢到锁后 进入锁内的方法 会再次进行二次检查,此时redis中肯定已经有对应的缓存了,就不会再走数据库查询,会直接返回 缓存与数据库双写不一致 在高并发场景下,多个线程同时操作数据库和缓存,一定会存在并发安全问题。

导致缓存与数据库双写不一致的情况非常多,这里只举2个例子,更多的是在实际项目中思考,如何规避掉不一致的情况。

值得注意的是,该情况一般只会出现在 大型的互联网项目、并发较高的场景中或存在多线程同时操作数据库和缓存的场景。

在一般的常规项目中,并发的几率很少,很少会遇到缓存双写不一致的情况,所以不用考虑这个双写不一致的问题,应当根据实际的规模和业务场景分析,看是否要使用 分布式锁来解决缓存不一致的问题。

案例一执行流程图

如图所示,这里有2个线程,thread 1,2 (下面都这样叫),他们各自的行为操作:

thread1 写入数据库 stock=10后更新缓存。 thread2去修改数据库库存,stock = 6 后 更新缓存。

问题来了,如thread1 写入数据库 stock = 10后更新缓存,在更新缓存的过程中,thread2也开始写数据库(stock = 6), 并比thread1 更早的更新缓存,此时stock = 6,但是当thread1 执行完了之后 又会把stock 改成10 。

那么此时,缓存中实际上是10,而数据库是6,这就造成了 缓存与数据库双写不一致的情况。

案例二image-2

如图所示,这里有2个线程,thread 2,3 (下面都这样叫),他们各自的行为操作:

thread2去修改数据库库存,stock = 6 后 更新缓存。 thread3 查询缓存,缓存是空的,在去查询数据库,取得stock后 会更新到缓存中 ,更新stock = 10。

thread3去查询缓存,发现是空的,在去查询数据库stock=10后准备更新到缓存中,刚好thread2 写入数据库 改变库存 stock =6,并且删除了缓存,并且thread2执行比thread3 更早结束,那么此时 缓存中是没有该记录的,当thread3执行更新缓存结束后,结果还是stock = 6,那么 此时的 缓存中的stock是 10,数据库是6,这又造成了 双写不一致。

解决方案 对于一些并发机率很小的数据,如 个人的订单数据,购物车,用户数据等,就不需要考虑双写不一致的问题,因为很少会出现,即便出现了 缓存不一致,我们通过对缓存设置超时时间,也可以解决。 就算并发很高,如果业务上能容忍短时间的缓存数据不一致(如商品名称,商品分类菜单等),缓存加上过期时间依然可以解决大部分业务对于缓存的要求。 对于要求强一致性的数据,如 库存,商品价格等,我们就可以通过 分布式读写锁 来解决 不一致的问题。 也可以用阿里开源的canal通过监听数据库的binlog日志及时的去修改缓存,但是对系统侵入性比较大 解决方案

互联网大部分场景都是读多写少 的场景,加入了缓存确实能提高查询的一个效率,如果是读多写多的场景,又必须保证数据的一致性,就需要谨慎使用缓存,存入缓存的数据应该是对实时性和一致性要求不高的数据。

注意:引入了分布式锁势必就会降低并发的性能,应当谨慎使用。

分布式锁分布式锁

在高并发场景下,多个线程同时操作数据库和缓存,一定会存在并发安全问题,我们可以通过 分布式锁来解决。

如:

thread3 在查询数据库之前加锁,直到 更新完缓存后 在释放锁。

thread2 在操作数据库之前加锁,操作完数据库并删除缓存后 释放锁。

核心思想:当某个线程在操作一个商品的数据时,通过分布式锁确保不会有其他线程介入进来同时操作,只有该线程执行结束后,才允许其他线程来操作该商品。

以下代码,我们通过操作user,来模拟操作商品。 把 user当成 product 商品来看就可以了。

查询用户:

优化思路:

在查询数据库之前,为了确保现场安全,不让其他线程来操作这个user(或看成商品信息),我们对该商品信息的key进行加锁 操作完该商品的数据库和缓存后,在释放锁

分布式锁执行流程

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 private final static String USER_INFO = "user:info:"; private final static Integer USER_CACHE_TIMEOUT = 60 * 60 * 24; private final static String EMPTY_CACHE = "{}"; private final static String LOCK_USER_HOT_CACHE_CREATE_PREFIX = "lock:user:hot_cache_create:"; private final static String LOCK_USER_UPDATE_PREFIX = "lock:user:update:"; @Autowired private Redisson redisson;@GetMapping("/{id}") public R user(@PathVariable Long id) { UserVO data = null; // 判断redis中是否有该用户数据 UserVO userResult = getUserHotCacheInfo(id); if (userResult != null) { return R.ok(userResult); } // 创建 redisson分布式锁 RLock hotCacheCreateLock = redisson.getLock(LOCK_USER_HOT_CACHE_CREATE_PREFIX + id); // 加锁 hotCacheCreateLock.lock(); try { userResult = getUserHotCacheInfo(id); if (userResult != null) { return R.ok(userResult); } // 查询数据库之前,先进行加锁 RLock userUpdateLock = redisson.getLock(LOCK_USER_UPDATE_PREFIX + id); userUpdateLock.lock(); try { data = userService.selectUserVoById(id); if (EmptyUtils.isNotEmpty(data)) { // 不为空,代表查询到了用户数据,我们就进行缓存 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS); } else { // 查询不到数据就存储一个空json字符串 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS); } }finally { userUpdateLock.unlock(); } }finally { hotCacheCreateLock.unlock(); } return R.ok(data); }

更新用户信息:

优化思路:

更新用户信息之前,我们要先加锁,确保其他线程不会在我们更新的过程中来修改用户数据库和redis缓存,造成我们的线程安全问题 操作结束后 释放锁

更新用户信息

12345678910111213141516171819202122/** * 更新用户信息 * * @param userDto 用户信息 * @return R */ @SysLog("更新用户信息") @PutMapping @PreAuthorize("@pms.hasPermission('sys_user_edit')") public R updateUser(@Valid @RequestBody UserDTO userDto) { // 更新用户之前,先进行加锁 RLock userUpdateLock = redisson.getLock(LOCK_USER_UPDATE_PREFIX + userDto.getUserId()); userUpdateLock.lock(); try { if (userService.updateUser(userDto)) { stringRedisTemplate.opsForValue().set(USER_INFO + userDto.getUserId(), JSON.toJSONString(userDto), createRandomExpTime(), TimeUnit.SECONDS); } }finally { userUpdateLock.unlock(); } return R.ok(userDto); } 什么场景下使用分布式锁来解决

值得注意的是,该情况一般只会出现在 大型的互联网项目、并发较高的场景中或存在多线程同时操作数据库和缓存的场景。

在一般的常规项目中,很少会遇到缓存双写不一致的情况,所以应当根据实际的规模和业务场景分析,看是否要使用 分布式锁来解决缓存不一致的问题。

关于代码复杂度的问题

image-1

大部分用户请求该接口,都是为了查询用户信息,而用户信息在redis中有存储的,所以 很少会走到下面的复杂逻辑,即便走到了下面的逻辑,也就第一次来查询的时候,redis中不存在数据的情况才会出现,比起 直接出现了缓存穿透和双写不一致的风险来说,是可以接受的。

可以把用户看成商品,这里我没有去做商品的原因,是刚好没有商品的demo,换言之,大部分用户请求该接口都是为了查询该商品的信息,而redis中如果有商品信息,都会在第一步就返回了。如果没有,也就会走下面的逻辑,但仅仅也只有一次。

分布式读写锁(推荐)

除了分布式锁外,还可以通过 分布式读写锁来解决缓存与数据库双写不一致的问题,通过 读写锁来解决双写不一致的问题,效率会比 普通的分布式锁效率高。

读写锁,分为 读锁和写锁,写锁和写锁互斥,读写和写锁互斥,读锁和读锁 不互斥(相当于无锁),一般在操作或查询数据库之前使用,避免线程相互之间干扰造成的线程安全问题。

写读:写锁和读锁同时竞争一个锁,假设 A线程抢到了这个写锁,那么B线程的读锁就会阻塞线程等待A线程释放锁。

写写:写锁和写锁同时竞争一个锁,假设 A线程抢到了这个写锁,那么B写锁就会阻塞线程等待 B线程释放锁。

读读:多个线程同时访问读锁时,相当于无锁,不需要阻塞,可以并发同时执行。

注意:写锁和读锁的key 要一致,才能进行互斥,保证并发下的线程安全。

实现原理

分布式读写锁实现流程

1、读读(并发执行)

当线程创建读写锁方法,根据读取的锁key名,发现锁已经被别的线程持有了,通过model字段类型 判断当前的锁 角色,锁的model 是读锁,线程执行的方法也是读锁,就是读读,那么就会实现 重入锁,并且 线程并行执行,不会进入阻塞状态。

2、写读

当线程创建读写锁方法,发现锁已经被别的线程持有了,model 类型是 写锁,且当前线程的方法是创建读锁,那么线程就会进入阻塞状态,等待 持有写锁的线程释放锁。

3、写写

当线程创建读写锁方法,发现锁已经被别的线程持有了,model 类型是 写锁,且当前线程的方法是创建写锁,那么线程就会进入阻塞状态,等待 持有写锁的线程释放锁。

代码实现

读锁:

12345678910111213141516171819 // 创建读写锁对象RReadWriteLock userUpdateLock = redisson.getReadWriteLock(LOCK_USER_UPDATE_PREFIX + id);// 创建读锁RLock readLock = userUpdateLock.readLock();// 加锁readLock.lock();try { data = userService.selectUserVoById(id); if (EmptyUtils.isNotEmpty(data)) { // 不为空,代表查询到了用户数据,我们就进行缓存 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), JSON.toJSONString(data), createRandomExpTime(), TimeUnit.SECONDS); } else { // 查询不到数据就存储一个空json字符串 stringRedisTemplate.opsForValue().set(USER_INFO + data.getUserId(), EMPTY_CACHE, 60, TimeUnit.SECONDS); }}finally { readLock.unlock();;}

写锁:

12345678910111213141516171819202122232425262728293031/** * 更新用户信息 * * @param userDto 用户信息 * @return R */ @SysLog("更新用户信息") @PutMapping @PreAuthorize("@pms.hasPermission('sys_user_edit')") public R updateUser(@Valid @RequestBody UserDTO userDto) { // 更新用户之前,先进行加锁// RLock userUpdateLock = redisson.getLock(LOCK_USER_UPDATE_PREFIX + userDto.getUserId());// userUpdateLock.lock(); // 创建读写锁对象 RReadWriteLock userUpdateLock = redisson.getReadWriteLock(LOCK_USER_UPDATE_PREFIX + id); // 创建写锁 RLock writeLock = userUpdateLock.writeLock(); // 加锁 writeLock.lock(); try { if (userService.updateUser(userDto)) { stringRedisTemplate.opsForValue().set(USER_INFO + userDto.getUserId(), JSON.toJSONString(userDto), createRandomExpTime(), TimeUnit.SECONDS); } }finally { // 释放锁 writeLock.unlock(); } return R.ok(userDto); } 什么情况下使用读写锁

一般都是用于解决双写不一致的情况下使用,那什么时候使用 读锁,什么时候使用 写锁?

这里的读和写,指的是 对数据库的读和写,而并非redis,对数据库修改操作时,使用写锁。对数据库只是读取,但是会更新到缓存,则使用 读锁。

读写锁Lua源码

实现的方法和之前一样,只是多了一个model 来区分是 读锁还是写锁。

读写锁lua源码

串行转并行 分布式锁在部分场景下也是可以进行优化的,比如把串行变成并行执行。

如下图,我们使用了 tryLock,该方法会在指定的时间内尝试加锁,如果时间过了后还没加锁成功,就会直接返回false。

如,我们设定了1秒,但是查询业务1秒返回是最基本的。这个时间可以根据业务场景自行调控。

优缺点

缺点:可能存在bug,如果其他线程业务执行超过了 指定时间(如 1秒),没抢到锁的线程就会直接返回false,就会出现BUG。

优点:能大幅度的提升性能,之前的分布式锁,抢到锁的线程需要释放锁,没抢到锁的线程阻塞等待唤醒,唤醒后继续抢锁,抢锁后业务执行结束再释放锁。

如果使用了tryLock 这里的场景,抢到锁的线程业务执行结束会释放锁,但是没抢到锁的线程 不会阻塞等待,而是在指定时间内不停的重试加锁,直到时间结束,与之相比,少了 阻塞和 线程之间抢锁 解锁的等待时间。

具体业务场景具体分析,不能一概而论,也没有完美的方案,作为一名架构师,应当 权利利弊,对技术有一个正确的认知,并在合理的场景下使用合理的方案。

image-1 开发规范与性能优化 键值设计1. key名设计 (1)【建议】: 可读性和可管理性

以业务名(或数据库名)为前缀(防止key冲突),用冒号分隔,比如业务名:表名:id

​ trade:order:1

(2)【建议】:简洁性

保证语义的前提下,控制key的长度,当key较多时,内存占用也不容忽视,例如:

​ user:{uid}:friends:messages:{mid} 简化为 u:{uid}:fr:m:{mid}

(3)【强制】:不要包含特殊字符

反例:包含空格、换行、单双引号以及其他转义字符

2. value设计 (1)【强制】:拒绝bigkey(防止网卡流量、慢查询)

在Redis中,一个字符串最大512MB,一个二级数据结构(例如hash、list、set、zset)可以存储大约40亿个(2^32-1)个元素,但实际中如果下面两种情况,我就会认为它是bigkey。

字符串类型:它的big体现在单个value值很大,一般认为超过10KB就是bigkey。 非字符串类型:哈希、列表、集合、有序集合,它们的big体现在元素个数太多。

一般来说,string类型控制在10KB以内,hash、list、set、zset元素个数不要超过5000。

反例:一个包含200万个元素的list。

非字符串的bigkey,不要使用del删除,使用hscan、sscan、zscan方式渐进式删除,同时要注意防止bigkey过期时间自动删除问题(例如一个200万的zset设置1小时过期,会触发del操作,造成阻塞)

ibgkey的危害1.导致redis阻塞2.网络拥塞

bigkey也就意味着每次获取要产生的网络流量较大,假设一个bigkey为1MB,客户端每秒访问量为1000,那么每秒产生1000MB的流量,对于普通的千兆网卡(按照字节算是128MB/s)的服务器来说简直是灭顶之灾,而且一般服务器会采用单机多实例的方式来部署,也就是说一个bigkey可能会对其他实例也造成影响,其后果不堪设想。

3.过期删除

有个bigkey,它安分守己(只执行简单的命令,例如hget、lpop、zscore等),但它设置了过期时间,当它过期后,会被删除,如果没有使用Redis 4.0的过期异步删除(lazyfree-lazy-expire yes),就会存在阻塞Redis的可能性。

bigkey的产生

一般来说,bigkey的产生都是由于程序设计不当,或者对于数据规模预料不清楚造成的,来看几个例子:

(1) 社交类:粉丝列表,如果某些明星或者大v不精心设计下,必是bigkey。

(2) 统计类:例如按天存储某项功能或者网站的用户集合,除非没几个人用,否则必是bigkey。

(3) 缓存类:将数据从数据库load出来序列化放到Redis里,这个方式非常常用,但有两个地方需要注意,第一,是不是有必要把所有字段都缓存;第二,有没有相关关联的数据,有的同学为了图方便把相关数据都存一个key下,产生bigkey。

如何优化bigkey 拆

big list: list1、list2、…listN

big hash:可以讲数据分段存储,比如一个大的key,假设存了1百万的用户数据,可以拆分成200个key,每个key下面存放5000个用户数据

如果bigkey不可避免,也要思考一下要不要每次把所有元素都取出来(例如有时候仅仅需要hmget,而不是hgetall),删除也是一样,尽量使用优雅的方式来处理。 (2)【推荐】:选择适合的数据类型。

例如:实体类型(要合理控制和使用数据结构,但也要注意节省内存和性能之间的平衡)

反例:

​ set user:1:name tom set user:1:age 19 set user:1:favor football

正例:

​ hmset user:1 name tom age 19 favor football

3.【推荐】:控制key的生命周期,redis不是垃圾桶。

建议使用expire设置过期时间(条件允许可以打散过期时间,防止集中过期)。

命令使用

1.【推荐】 O(N)命令关注N的数量

例如hgetall、lrange、smembers、zrange、sinter等并非不能使用,但是需要明确N的值。有遍历的需求可以使用hscan、sscan、zscan代替。

2.【推荐】:禁用命令

禁止线上使用keys、flushall、flushdb等,通过redis的rename机制禁掉命令,或者使用scan的方式渐进式处理。

3.【推荐】合理使用select

redis的多数据库较弱,使用数字进行区分,很多客户端支持较差,同时多业务用多数据库实际还是单线程处理,会有干扰。

4.【推荐】使用批量操作提高效率

​ 原生命令:例如mget、mset。 非原生命令:可以使用pipeline提高效率。

但要注意控制一次批量操作的元素个数(例如500以内,实际也和元素字节数有关)。

注意两者不同:

原生命令是原子操作,pipeline是非原子操作。 pipeline可以打包不同的命令,原生命令做不到 pipeline需要客户端和服务端同时支持。

5.【建议】Redis事务功能较弱,不建议过多使用,可以用lua替代

客户端使用 避免多个应用使用同一个redis实例 对业务进行拆分,不同业务尽量使用不同的redis实例 使用连接池可以有效控制连接,提高效率 连接池

使用带有连接池的数据库,可以有效控制连接,同时提高效率,标准使用方式:

12345678910111213141516171819JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxTotal(5);jedisPoolConfig.setMaxIdle(2);jedisPoolConfig.setTestOnBorrow(true);JedisPool jedisPool = new JedisPool(jedisPoolConfig, "192.168.0.60", 6379, 3000, null);Jedis jedis = null;try { jedis = jedisPool.getResource(); //具体的命令 jedis.executeCommand()} catch (Exception e) { logger.error("op key {} error: " + e.getMessage(), key, e);} finally { //注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。 if (jedis != null) jedis.close();}

连接池参数含义:

序号 参数名 含义 默认值 使用建议 1 maxTotal 资源池中最大连接数 8 设置建议见下面 2 maxIdle 资源池允许最大空闲的连接数 8 设置建议见下面 3 minIdle 资源池确保最少空闲的连接数 0 设置建议见下面 4 blockWhenExhausted 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效 true 建议使用默认值 5 maxWaitMillis 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒) -1:表示永不超时 不建议使用默认值 6 testOnBorrow 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除 false 业务量很大时候建议设置为false(多一次ping的开销)。 7 testOnReturn 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除 false 业务量很大时候建议设置为false(多一次ping的开销)。 8 jmxEnabled 是否开启jmx监控,可用于监控 true 建议开启,但应用本身也要开启

优化建议:

1)maxTotal:最大连接数,早期的版本叫maxActive

实际上这个是一个很难回答的问题,考虑的因素比较多:

业务希望Redis并发量

客户端执行命令时间

Redis资源:例如 nodes(例如应用个数) * maxTotal 是不能超过redis的最大连接数maxclients(默认10000个)。

资源开销:例如虽然希望控制空闲连接(连接池此刻可马上使用的连接),但是不希望因为连接池的频繁释放创建连接造成不必靠开销。

以一个例子说明,假设:

一次命令时间(borrow|return resource + Jedis执行命令(含网络) )的平均耗时约为1ms,一个连接的QPS大约是1000 业务期望的QPS是50000

那么理论上需要的资源池大小是50000 / 1000 = 50个。但事实上这是个理论值,还要考虑到要比理论值预留一些资源,通常来讲maxTotal可以比理论值大一些。

但这个值不是越大越好,一方面连接太多占用客户端和服务端资源,另一方面对于Redis这种高QPS的服务器,一个大命令的阻塞即使设置再大资源池仍然会无济于事。

2)maxIdle和minIdle

maxIdle实际上才是业务需要的最大连接数,maxTotal是为了给出余量,所以maxIdle不要设置过小,否则会有new Jedis(新连接)开销。

连接池的最佳性能是maxTotal = maxIdle,这样就避免连接池伸缩带来的性能干扰。但是如果并发量不大或者maxTotal设置过高,会导致不必要的连接资源浪费。一般推荐maxIdle可以设置为按上面的业务期望QPS计算出来的理论连接数,maxTotal可以再放大一倍。

minIdle(最小空闲连接数),与其说是最小空闲连接数,不如说是”至少需要保持的空闲连接数“,在使用连接的过程中,如果连接数超过了minIdle,那么继续建立连接,如果超过了maxIdle,当超过的连接执行完业务后会慢慢被移出连接池释放掉。

如果系统启动完马上就会有很多的请求过来,那么可以给redis连接池做预热,比如快速的创建一些redis连接,执行简单命令,类似ping(),快速的将连接池里的空闲连接提升到minIdle的数量。

连接池预热示例代码:

123456789101112131415161718192021222324252627List minIdleJedisList = new ArrayList(jedisPoolConfig.getMinIdle());for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) { Jedis jedis = null; try { jedis = pool.getResource(); minIdleJedisList.add(jedis); jedis.ping(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { //注意,这里不能马上close将连接还回连接池,否则最后连接池里只会建立1个连接。。 //jedis.close(); }}//统一将预热的连接还回连接池for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) { Jedis jedis = null; try { jedis = minIdleJedisList.get(i); //将连接归还回连接池 jedis.close(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { }}

总之,要根据实际系统的QPS和调用redis客户端的规模整体评估每个节点所使用的连接池大小。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3