flink写入Redis优化提速

您所在的位置:网站首页 redis读写效率 flink写入Redis优化提速

flink写入Redis优化提速

#flink写入Redis优化提速| 来源: 网络整理| 查看: 265

问题背景

本项目组已上线的用户行为解析flink作业,基本的数据流程是读取并解析kafka中的用户行为日志流,并将解析完成的数据以有序集合的形式写入Redis,下游的特征平台再去Redis读取特征,为线上的模型服务提供低延迟的特征访问。

现存flink作业在大促、活动期间等大流量场景,会存在算子反压甚至作业重启的情况。通过监控定位到主要的性能瓶颈在写入Redis部分,因此,我们希望优化flink作业写入Redis的效率。

实现思路

目前作业是通过逐条数据写入的方式输出到Redis的。对于写入到Redis的每条记录,都需要经历发送命令-得到返回结果的过程,当我们操作大量数据的时候,Redis的吞吐量将大大降低。

Redis 管道(pipelining)支持命令的批量提交。使用Pipelining可以在客户端积攒多个命令,再批量发送给服务端,减少Redis的调用次数,从而提高数据的吞吐量。

考虑到在极端情况下,如上游输入作业运行失败时,pipeline中积攒的命令数一直达不到批量提交时设置的阈值,从而影响特征时延。因此在此次优化中我们还加入了定时写入的功能。

具体实现 1.批量写入

主要实现思路:

1、为每个线程维护一个队列cache,用于存储待写入的数据。每来一条行为数据记录,就进行解析并存入cache。

2、当cache中记录数达到了阈值,使用pipeline将这一批数据写入Redis。

这里使用的cache是ConcurrentLinkedQueue

类实例,它是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序。

@Override public void invoke(Data data) { String key = ...; double score = ...; String value = ...; Tuple3 tuple = new Tuple3(key, score, value); cache.add(tuple); if(cache.size()>=threshold) { // 达到批量阈值向pipeline发送命令 flush(); } } private void flush() { try(Jedis jedis = jedisPool.getResource()){ // try-with-resources statement,释放连接池资源 Pipeline pipeline = jedis.pipelined(); Tuple3 temp = cache.poll(); // 判断poll结果是否为null,来确认cache是否为空 while (temp != null) { String pos1 = (String) temp.getField(0); double pos2 = (double) temp.getField(1); String pos3 = (String) temp.getField(2); pipeline.zadd(pos1, pos2, pos3); // 将eventTime作为score加入有序集合 pipeline.expire(pos1, 172800); // 清除48h内无行为的key temp = cache.poll(); } pipeline.sync(); } }2.定时写入

主要实现思路:

定时写入的实现参考StreamingFileSink的实现。

1、通过properties获取Apollo上配置的批量写入大小threshold,和定时写入时间interval

2、在open方法中获取ProcessingTimeService,然后注册一个interval时长的定时器

3、实现ProcessingTimeCallback接口的onProcessingTime方法,注册定时器的执行方法,进行数据输出的同时,注册下一个定时器

@Override public void open(Configuration parameters) throws Exception { super.open(parameters); threshold = Integer.parseInt(properties.getProperty("realtime.batch.threshold")); // 100条,输入行为大约1000条/s interval = Long.parseLong(properties.getProperty("realtime.time.interval")); // 1000ms, 定时器每1s写入一次Redis cache = new ConcurrentLinkedQueue(); processingTimeService = ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(); long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); processingTimeService.registerTimer(currentProcessingTime + interval, this); jedisPool = new JedisPool(new GenericObjectPoolConfig(), properties.getProperty("realtime.redis.ip"), ***, ***, properties.getProperty("realtime.redis.pwd")); } public void onProcessingTime(long timestamp) throws Exception { final long currentTime = processingTimeService.getCurrentProcessingTime(); flush(); processingTimeService.registerTimer(currentTime + interval, this); }

压测结果

原始作业和优化后作业使用同样的资源配置,消费将近130万条行为记录。

表1:不同并行度下原始作业与优化后作业的输入/输出速率对比(record/s)

并行度12原始作业14732841优化后作业2200822008

可以发现:

1、在1个并行度的情况下,优化后作业的输入/输出速率是原始作业的15倍。优化后的作业满足了我们的性能需求。

2、在增加并行度时,原始作业的输入/输出速率成倍增加,但是优化后作业的性能并没有明显提升,主要原因是压测时我们使用的Redis实例做了流控,限制了我们写入数据的速率。

总结

这次实践通过简单的使用Redis pipeline就获得了如此大的性能提升,但在设计上仍有很多细节因素需要考虑,如基于DFX的考虑设计了额外的定时写入,连接池资源释放,如何保证线程安全等问题。

值得注意的是,写入速度提升主要是用jedis pipeline实现的。JedisCluster(3.8.0)之前是不支持pipeline的,4.0.1版本才支持cluster pipeline。

这里我们写入的是GaussDB for redis,因为底层的集群实现方式与原生redis不同。因此直接按redis standalone模式读写就好,而非cluster模式。

参考资料

[我们如何将 Flink 特征管道提速 7 倍]

[Flink Window机制与Timer]

[ConcurrentLinkedQueue使用和方法介绍 - yzl1990 - 博客园]

[如何通过Redis管道传输Pipeline优化性能_云数据库 Redis 版-阿里云帮助中心]



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3