flink写入Redis优化提速 |
您所在的位置:网站首页 › redis读写效率 › flink写入Redis优化提速 |
问题背景 本项目组已上线的用户行为解析flink作业,基本的数据流程是读取并解析kafka中的用户行为日志流,并将解析完成的数据以有序集合的形式写入Redis,下游的特征平台再去Redis读取特征,为线上的模型服务提供低延迟的特征访问。 现存flink作业在大促、活动期间等大流量场景,会存在算子反压甚至作业重启的情况。通过监控定位到主要的性能瓶颈在写入Redis部分,因此,我们希望优化flink作业写入Redis的效率。 实现思路目前作业是通过逐条数据写入的方式输出到Redis的。对于写入到Redis的每条记录,都需要经历发送命令-得到返回结果的过程,当我们操作大量数据的时候,Redis的吞吐量将大大降低。 Redis 管道(pipelining)支持命令的批量提交。使用Pipelining可以在客户端积攒多个命令,再批量发送给服务端,减少Redis的调用次数,从而提高数据的吞吐量。 考虑到在极端情况下,如上游输入作业运行失败时,pipeline中积攒的命令数一直达不到批量提交时设置的阈值,从而影响特征时延。因此在此次优化中我们还加入了定时写入的功能。 具体实现 1.批量写入主要实现思路: 1、为每个线程维护一个队列cache,用于存储待写入的数据。每来一条行为数据记录,就进行解析并存入cache。 2、当cache中记录数达到了阈值,使用pipeline将这一批数据写入Redis。 这里使用的cache是ConcurrentLinkedQueue 类实例,它是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序。 @Override public void invoke(Data data) { String key = ...; double score = ...; String value = ...; Tuple3 tuple = new Tuple3(key, score, value); cache.add(tuple); if(cache.size()>=threshold) { // 达到批量阈值向pipeline发送命令 flush(); } } private void flush() { try(Jedis jedis = jedisPool.getResource()){ // try-with-resources statement,释放连接池资源 Pipeline pipeline = jedis.pipelined(); Tuple3 temp = cache.poll(); // 判断poll结果是否为null,来确认cache是否为空 while (temp != null) { String pos1 = (String) temp.getField(0); double pos2 = (double) temp.getField(1); String pos3 = (String) temp.getField(2); pipeline.zadd(pos1, pos2, pos3); // 将eventTime作为score加入有序集合 pipeline.expire(pos1, 172800); // 清除48h内无行为的key temp = cache.poll(); } pipeline.sync(); } }2.定时写入主要实现思路: 定时写入的实现参考StreamingFileSink的实现。 1、通过properties获取Apollo上配置的批量写入大小threshold,和定时写入时间interval 2、在open方法中获取ProcessingTimeService,然后注册一个interval时长的定时器 3、实现ProcessingTimeCallback接口的onProcessingTime方法,注册定时器的执行方法,进行数据输出的同时,注册下一个定时器 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); threshold = Integer.parseInt(properties.getProperty("realtime.batch.threshold")); // 100条,输入行为大约1000条/s interval = Long.parseLong(properties.getProperty("realtime.time.interval")); // 1000ms, 定时器每1s写入一次Redis cache = new ConcurrentLinkedQueue(); processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(); long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); processingTimeService.registerTimer(currentProcessingTime + interval, this); jedisPool = new JedisPool(new GenericObjectPoolConfig(), properties.getProperty("realtime.redis.ip"), ***, ***, properties.getProperty("realtime.redis.pwd")); } public void onProcessingTime(long timestamp) throws Exception { final long currentTime = processingTimeService.getCurrentProcessingTime(); flush(); processingTimeService.registerTimer(currentTime + interval, this); }压测结果 原始作业和优化后作业使用同样的资源配置,消费将近130万条行为记录。 表1:不同并行度下原始作业与优化后作业的输入/输出速率对比(record/s) 并行度12原始作业14732841优化后作业2200822008可以发现: 1、在1个并行度的情况下,优化后作业的输入/输出速率是原始作业的15倍。优化后的作业满足了我们的性能需求。 2、在增加并行度时,原始作业的输入/输出速率成倍增加,但是优化后作业的性能并没有明显提升,主要原因是压测时我们使用的Redis实例做了流控,限制了我们写入数据的速率。 总结这次实践通过简单的使用Redis pipeline就获得了如此大的性能提升,但在设计上仍有很多细节因素需要考虑,如基于DFX的考虑设计了额外的定时写入,连接池资源释放,如何保证线程安全等问题。 值得注意的是,写入速度提升主要是用jedis pipeline实现的。JedisCluster(3.8.0)之前是不支持pipeline的,4.0.1版本才支持cluster pipeline。 这里我们写入的是GaussDB for redis,因为底层的集群实现方式与原生redis不同。因此直接按redis standalone模式读写就好,而非cluster模式。 参考资料 [我们如何将 Flink 特征管道提速 7 倍] [Flink Window机制与Timer] [ConcurrentLinkedQueue使用和方法介绍 - yzl1990 - 博客园] [如何通过Redis管道传输Pipeline优化性能_云数据库 Redis 版-阿里云帮助中心] |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |