使用RedisTemplate批量存入数据,100万测试 需要1分钟

您所在的位置:网站首页 redis批量查询多个key 使用RedisTemplate批量存入数据,100万测试 需要1分钟

使用RedisTemplate批量存入数据,100万测试 需要1分钟

2023-09-24 01:37| 来源: 网络整理| 查看: 265

目录

1.需求

2.基本配置

3.核心代码

4.dao

5.mapper/*.xml

6.数据表

7.工具类

参考文档

https://blog.csdn.net/xiaoliu598906167/article/details/82218525

https://blog.csdn.net/fouling/article/details/98631144

https://blog.csdn.net/supersub000/article/details/80100016

1.需求

将mysql数据库中的上百万数据存入redis中存入的格式为hashMap的形式 即,(hash路径,key, obj)其中的obj为map 封装一条记录信息 并且这些数据在用的时候可以取得到。 难点,普通通过循环hset的方式比较慢,一万条数据需要6秒左右,一百万需要10分钟左右 优化方案:redisTemplate的管道api executePipelined() 注意,仍需要将数据进行分段向redis中存入(不然被卡死),比如可以分段从数据库取然后分段存,  但是最好是一下子取出来,再分段存入,本案例是分段取分段存。

2.基本配置

   org.springframework.boot    spring-boot-starter-data-redis

基本配置类(必须)

package com.example.studyspringboot.studyboot.utils.stuRedis; import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @SuppressWarnings("rawtypes") @Autowired private RedisTemplate redisTemplate; /** * 解决redis插入中文乱码 * @return */ @SuppressWarnings({ "unchecked", "rawtypes" }) @Bean public RedisTemplate redisTemplateInit() { //设置序列化Key的实例化对象 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); //设置序列化Value的实例化对象 redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); redisTemplate.setHashValueSerializer(new GenericFastJsonRedisSerializer()); return redisTemplate; } }

配置文件

#MYSQL链接 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/redistest?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai spring.datasource.username=root spring.datasource.password=lps123 # AOP # spring.aop.auto=true # 默认使用cglib动态代理 # 强制使用jdk代理(不要这样做) #spring.aop.proxy-target-class=false #mybatis配置 mybatis.mapper-locations = classpath:/mapper/*.xml #mybatis.type-aliases-package=com.customerNoPlatform.entity #redis配置 #Redis服务器地址 spring.redis.host=127.0.0.1 #Redis服务器连接端口 spring.redis.port=6379 #Redis数据库索引(默认为0) spring.redis.database=0 #连接池最大连接数(使用负值表示没有限制) spring.redis.jedis.pool.max-active=50 #连接池最大阻塞等待时间(使用负值表示没有限制) spring.redis.jedis.pool.max-wait=3000 #连接池中的最大空闲连接 spring.redis.jedis.pool.max-idle=20 #连接池中的最小空闲连接 spring.redis.jedis.pool.min-idle=2 #连接超时时间(毫秒) spring.redis.timeout=5000 3.核心代码 package com.example.studyspringboot.studyboot.utils.stuRedis; import com.alibaba.fastjson.JSON; import com.example.studyspringboot.studyboot.dao.UserDao; import org.apache.catalina.Pipeline; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.annotation.Resource; import java.util.List; import java.util.Map; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class Main { @Autowired private RedisTemplate redisTemplate; @Autowired private UserDao userDao; @Autowired RedisUtil redisUtil; /** * 读取缓存数据 */ @Test public void get() { // redisUtils.get("908687_name_2726||908687_sex_2726||908687_hobby_2726908687_height_2726"); Map value = (Map)redisUtil.hget("nbs:hello:leaf","908687_name_2726||908687_sex_2726||908687_hobby_2726908687_height_2726"); System.out.println(JSON.toJSONString(value)); System.out.println(value.get("name")); } @Test public void method2() { CustomersToRedis("nbs:hello:leaf",0); } /** * @Author lps * @Description //TODO * @Date 2020-07-25 15:21:44 星期六 * @Param [path, stepSize] pathhash值的路径,stepSize 步长多少数据查一次 * @return boolean **/ public boolean CustomersToRedis(String path,int stepSize) { long zero = System.currentTimeMillis(); //数据总量 int total = userDao.getCount(); if(total==0){ return false; } stepSize=stepSize==0?100000:stepSize; //分步 int step =(total%stepSize)==0?(total/stepSize):(total/stepSize+1); long startTime;//开始时间 long selectTime;//查询花费时间 long endTime;//结束时间 long saveTime;//存入redis花费时间 long dis; for(int j = 1;j 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } log.info("指定redis缓存失效时间成功,key:{},time:{}",key,time); return true; } catch (Exception e) { e.printStackTrace(); log.info("指定redis缓存失效时间失败,key:{},time:{}",key,time); return false; } } /** * 根据key 获取过期时间 * * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { long time=redisTemplate.getExpire(key, TimeUnit.SECONDS); log.info("根据key获取redis缓存失效时间,key:{},time:{}",key,time); return time; } /** * 判断key是否存在 * * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { boolean flag=redisTemplate.hasKey(key); log.info("判断redis是否包含key,key:{},return:{}",key,flag); return flag; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * * @param key 可以传一个值 或多个 */ @SuppressWarnings("unchecked") public void del(String... key) { if (key != null && key.length > 0) { if (key.length == 1) { boolean flag=redisTemplate.delete(key[0]); log.info("从redis删除key,key:{},return:{}",key,flag); } else { long count=redisTemplate.delete(CollectionUtils.arrayToList(key)); log.info("从redis删除key,key:{},return:{}",key,count); } } } //============================String============================= /** * 普通缓存获取 * * @param key 键 * @return 值 */ public Object get(String key) { if(key==null) { log.info("从redis获取key,key:{},return:{}",key,null); return null; }else { Object object=redisTemplate.opsForValue().get(key); log.info("从redis获取key,key:{},return:{}",key,object); return object; } } /** * 普通缓存放入 * * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); log.info("保存键值到redis成功,key:{},value:{}",key,value); return true; } catch (Exception e) { e.printStackTrace(); log.info("保存键值到redis失败,key:{},value:{}",key,value); return false; } } /** * 普通缓存放入并设置时间 * * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); log.info("保存键值到redis成功并设定失效时间,key:{},value:{},time:{},",key,value,time); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } //================================Map================================= /** * HashGet * * @param key 键 不能为null * @param item 项 不能为null * @return 值 */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * * @param key 键 * @return 对应的多个键值 */ public Map hmget(String key) { return redisTemplate.opsForHash().entries(key); } /** * HashSet * * @param key 键 * @param map 对应多个键值 * @return true 成功 false 失败 */ public boolean hmset(String key, Map map) { try { redisTemplate.opsForHash().putAll(key, map); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * HashSet 并设置时间 * * @param key 键 * @param map 对应多个键值 * @param time 时间(秒) * @return true成功 false失败 */ public boolean hmset(String key, Map map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 向一张hash表中放入数据,如果不存在将创建 * * @param key 键 * @param item 项 * @param value 值 * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间 * @return true 成功 false失败 */ public boolean hset(String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0) { expire(key, time); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除hash表中的值 * * @param key 键 不能为null * @param item 项 可以使多个 不能为null */ public void hdel(String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } /** * 判断hash表中是否有该项的值 * * @param key 键 不能为null * @param item 项 不能为null * @return true 存在 false不存在 */ public boolean hHasKey(String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } //============================set============================= /** * 根据key获取Set中的所有值 * * @param key 键 * @return */ public Set sGet(String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 根据value从一个set中查询,是否存在 * * @param key 键 * @param value 值 * @return true 存在 false不存在 */ public boolean sHasKey(String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将数据放入set缓存 * * @param key 键 * @param values 值 可以是多个 * @return 成功个数 */ public long sSet(String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 将set数据放入缓存 * * @param key 键 * @param time 时间(秒) * @param values 值 可以是多个 * @return 成功个数 */ public long sSetAndTime(String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0) expire(key, time); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 获取set缓存的长度 * * @param key 键 * @return */ public long sGetSetSize(String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 移除值为value的 * * @param key 键 * @param values 值 可以是多个 * @return 移除的个数 */ public long setRemove(String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0; } } //===============================list================================= /** * 获取list缓存的内容 * * @param key 键 * @param start 开始 * @param end 结束 0 到 -1代表所有值 * @return */ public List lGet(String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null; } } /** * 获取list缓存的长度 * * @param key 键 * @return */ public long lGetListSize(String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 通过索引 获取list中的值 * * @param key 键 * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 将list放入缓存 * * @param key 键 * @param value 值 * @param time 时间(秒) * @return */ public boolean lSet(String key, List value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0) expire(key, time); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据索引修改list中的某条数据 * * @param key 键 * @param index 索引 * @param value 值 * @return */ public boolean lUpdateIndex(String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 移除N个值为value * * @param key 键 * @param count 移除多少个 * @param value 值 * @return 移除的个数 */ public long lRemove(String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0; } } /** * 分布式锁 * * @param key key值 * @return 是否获取到 */ @SuppressWarnings("unchecked") public boolean lock(String key){ String lock = LOCK_PREFIX + key; return (Boolean) redisTemplate.execute((RedisCallback) connection -> { long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1; Boolean acquire = connection.setNX(lock.getBytes(), String.valueOf(expireAt).getBytes()); if (acquire) { return true; } else { byte[] value = connection.get(lock.getBytes()); if (Objects.nonNull(value) && value.length > 0) { long expireTime = Long.parseLong(new String(value)); // 如果锁已经过期 if (expireTime < System.currentTimeMillis()) { // 重新加锁,防止死锁 byte[] oldValue = connection.getSet(lock.getBytes(), String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes()); return Long.parseLong(new String(oldValue)) < System.currentTimeMillis(); } } } return false; }); } /** * 删除锁 * * @param key */ public void delete(String key) { redisTemplate.delete(key); } } 8.一次查询,分批插入的效率对比

核心代码

@Test public void method3() { long startTime = System.currentTimeMillis(); int total = userDao.getCount(); List resList = userDao.getLimitUser(0,total); long selectTime = System.currentTimeMillis(); System.out.println("查询数据花费时间:"+(selectTime-startTime)+"ms"); int len = resList.size();//实际数据总量(等于)total int stepSize=100000;//步长为10万; String path="nbs:hello:leaf"; int cnt=1; long startRedisTime; long endRedisTime= -System.currentTimeMillis(); List tempList = new ArrayList(); for(int i=0;i0){ startRedisTime = System.currentTimeMillis(); handleSave(path,tempList); endRedisTime = System.currentTimeMillis(); tempList.clear();//清除内存 System.out.println("存入redi花费时间:"+( endRedisTime-startRedisTime)+"ms"); } System.out.println("总共花费的时间为:"+(endRedisTime-startTime)+"ms"); }

运行结果

查询数据花费时间:17521ms 第1次向redis中存入数据 长度:100001 存入redi花费时间:6233ms 第2次向redis中存入数据 长度:100000 存入redi花费时间:4107ms 第3次向redis中存入数据 长度:100000 存入redi花费时间:3579ms 第4次向redis中存入数据 长度:100000 存入redi花费时间:3406ms 第5次向redis中存入数据 长度:100000 存入redi花费时间:3384ms 第6次向redis中存入数据 长度:100000 存入redi花费时间:3468ms 第7次向redis中存入数据 长度:100000 存入redi花费时间:3382ms 第8次向redis中存入数据 长度:100000 存入redi花费时间:3359ms 第9次向redis中存入数据 长度:100000 存入redi花费时间:3742ms 第10次向redis中存入数据 长度:100000 存入redi花费时间:4318ms 长度:24 存入redi花费时间:6ms 总共花费的时间为:56535ms

可能由于都是本地连接,而不是远程连接。所以看不出来差别。

(1)值得注意的是大量数据如果一次性从数据库读到内存中,操作起来程序就会崩溃。比如一次性读入300万的数据就无法继续进行存储操作了。所以这个一次性读入,分批存的想法经过测试是行不通的。

(2)另外随着数据量的增加,即便是分批从数据库读,再分批写写入redis中,效率也越来也低。100万数据需要1分钟。

  300百万数据却要4分钟。如果数据进一步增加的话可能就无法胜任了。具体如下:



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3