JRC Flink流作业调优指南

您所在的位置:网站首页 flink的配置 JRC Flink流作业调优指南

JRC Flink流作业调优指南

#JRC Flink流作业调优指南| 来源: 网络整理| 查看: 265

作者:京东物流 康琪

本文综合Apache Flink原理与京东实时计算平台(JRC)的背景,详细讲述了大规模Flink流作业的调优方法。通过阅读本文,读者可了解Flink流作业的通用调优措施,并应用于生产环境。

写在前面

Apache Flink作为Google Dataflow Model的工业级实现,经过多年的发展,如今已经成为流式计算开源领域的事实标准。它具有高吞吐、低时延、原生流批一体、高一致性、高可用性、高伸缩性的特征,同时提供丰富的层级化API、时间窗口、状态化计算等语义,方便用户快速入门实时开发,构建实时计算体系。

古语有云,工欲善其事,必先利其器。要想让大规模、大流量的Flink作业高效运行,就必然要进行调优,并且理解其背后的原理。本文是笔者根据过往经验以及调优实践,结合京东实时计算平台(JRC)背景产出的面向专业人员的Flink流作业调优指南。主要包含以下四个方面:

TaskManager内存模型调优 网络栈调优 RocksDB与状态调优 其他调优项

本文基于Flink 1.12版本。阅读之前,建议读者对Flink基础组件、编程模型和运行时有较深入的了解。

01 TaskManager内存模型调优 1.1 TaskManager内存模型与参数

目前的Flink TaskManager内存模型是1.10版本确定下来的,官方文档中给出的图示如下。在高版本Flink的Web UI中,也可以看到这张图。

图1 TaskManager内存模型

下面来看图说话,分区域给出比官方文档详细一些的介绍。t.m.即为taskmanager. memory.前缀的缩写。

1.2 平台特定参数

除了TaskManager内存模型相关的参数之外,还有一些平台提供的其他参数,列举如下。

1.3 TM/平台参数与JVM的关系

上述参数与TaskManager JVM本身的参数有如下的对应关系:

-Xms | -Xmx → t. m. framework. heap. size + t. m. task. heap. size -Xmn → -Xmx * apus. taskmanager. heap. newsize. ratio -XX: Max Direct Memory Size → t. m. framework. off- heap. size + t. m. task. off- heap. size + $network -XX: Max Metaspace Size → t. m. jvm- metaspace. size

另外,还可以通过env.java.opts.{jobmanager | taskmanager}配置项来分别设定JM和TM JVM的附加参数。

1.4 内存分配示例

下面以在生产环境某作业中运行的8C / 16G TaskManager为例,根据以上规则,手动计算各个内存分区的配额。注意有部分参数未采用默认值。

t.m.process.size = 16384 t.m.flink.size = t.m.process.size * apus.memory.incontainer.available.ratio = 16384 * 0.9 = 14745.6 t.m.jvm-metaspace.size = [t.m.process.size - t.m.flink.size] * apus.metaspace.incutoff.ratio = [16384 - 14745.6] * 0.25 = 409.6 $overhead = MIN{t.m.process.size * t.m.jvm-overhead-fraction, t.m.jvm-overhead.max} = MIN{16384 * 0.1, 1024} = 1024 $network = MIN{t.m.flink.size * t.m.network.fraction, t.m.network.max} = MIN{14745.6 * 0.3, 5120} = 4423.68 $managed = t.m.flink.size * t.m.managed.fraction = 14745.6 * 0.25 = 3686.4 t.m.task.off-heap.size = t.m.flink.size * apus.taskmanager.memory.task.off-heap.fraction = 14745.6 * 0.01 = 147.4 t.m.task.heap.size = t.m.flink.size - $network - $managed - t.m.task.off-heap.size - t.m.framework.heap.size - t.m.framework.off-heap.size = 14745.6 - 4423.68 - 3686.4 - 147.4 - 128 - 128 = 6232.12

与Web UI中展示的内存配额做比对,可发现完全吻合。

图2 Web UI展示的内存分配情况

1.5 调优概览

理解TaskManager内存模型是开展调优的大前提,进行调优的宗旨就是:合理分配,避免浪费,保证性能。下面先对比较容易出现问题的三块区域做简要的解说。

1.关于任务堆外内存

平台方的解释是有些用户的作业需要这部分内存,但从Flink Runtime的角度讲,主要是批作业(如Sort-Merge Shuffle过程)会积极地使用它。相对地,流作业很少涉及这一部分,除非用户代码或用户引用的第三方库直接操作了DirectByteBuffer或Unsafe之类。所以一般可以优先保证堆内存,即尝试将 apus.t.m.task.off-heap.fraction再调小一些(如0.05),再观察作业运行是否正常。

2.关于托管内存

如果使用RocksDB状态后端,且状态数据量较大或读写较频繁,建议适当增加t.m.managed.fraction,如0.2~0.5,可配合RocksDB监控决定。如果不使用RocksDB状态后端,可设为0,因为其他状态后端下的本地状态会存在TaskManager堆内存中。后文会详细讲解RocksDB相关的调优项。

3.关于网络缓存

需要特别注意的是,网络缓存的占用量与并行度和作业拓扑有关,而与实际网络流量关系不大,所以不能简单地以作业的数据量来设置这一区域。粗略地讲,对简单拓扑,建议以默认值启动作业,再观察该区域的利用情况并进行调整;对复杂拓扑,建议先适当调大t.m.network.fraction和max,保证不出现IOException: Insufficient number of network buffers异常,然后再做调整。另外,请一定不要把t.m.network.min和max设成相等的值,这样会直接忽略fraction,而这种直接的设定往往并不科学。下一节就来详细讲解Flink网络栈的调优。

02 网络栈调优 2.1 网络栈和网络缓存

图3 Flink网络栈

Flink的网络栈构建在Netty的基础之上。如上图所示,每个TaskManager既可以是Server(发送端)也可以是Client(接收端),并且它们之间的TCP连接会被复用,以减少资源消耗。

图中的小色块就是网络缓存(NetworkBuffer),它是数据传输的最基本单位,以直接内存的形式分配,承载序列化的StreamRecord数据,且一个Buffer的大小就等于一个MemorySegment的大小(t.m.segment-size,默认32KB)。TM中的每个Sub-task都会创建网络缓存池(NetworkBufferPool),用于分配和回收Buffer。下面讲解一下网络缓存的分配规则。

2.2 网络缓存分配规则

Flink流作业的执行计划用三层DAG来表示,即:StreamGraph(逻辑计划)→ JobGraph(优化的逻辑计划)→ ExecutionGraph(物理计划)。当ExecutionGraph真正被调度到TaskManager上面执行时,形成的是如下图所示的结构。

图4 Flink物理执行图结构

每个Sub-task都有一套用于数据交换的组件,输出侧称为ResultPartition(RP),输入侧称为InputGate(IG)。另外,它们还会根据并行度和上下游的DistributionPattern(POINTWISE或ALL_TO_ALL)划分为子块,分别称为ResultSubpartition(RS)和InputChannel(IC)。注意上下游RS和IC的比例是严格1:1的。网络缓存就是在ResultPartition和InputGate级别分配的,具体的分配规则是:

#Buffer-RP = #RS + 1 && #Buffer-RS

s.b.r.memory.high-prio-pool-ratio:高优先级区内存占托管内存的比例,默认值0.1。

剩余的部分(默认0.4)就是留给数据BlockCache的配额。用户一般不需要更改它们,若作业状态特别重读或重写,可适当调整,但必须先保证托管内存充足。

3.3 其他RocksDB参数

** 1.s.b.r.checkpoint.transfer.thread.num(默认1)**

每个有状态算子在Checkpoint时传输数据的线程数,增大此值会对网络和磁盘吞吐量有更高要求。一般建议4~8,1.13版本中默认已改为4。

** 2.s.b.r.timer-service.factory(社区版默认ROCKSDB,平台默认HEAP)**

Timer相关状态存储的位置,包含用户注册的Timer和框架内部注册的Timer(如Window、Trigger)。若存储在堆中,则Timer状态做CP时无法异步Snapshot,所以Timer很多的情况下存在RocksDB内更好。但美中不足的是,设置为ROCKSDB会有一个极偶发的序列化bug,导致无法从Savepoint恢复状态,若不能接受,建议HEAP。

** 3.s.b.r.predefined-options(默认DEFAULT)**

社区提供的预设RocksDB调优参数集,有4种:DEFAULT、SPINNING_DISK_OPTIMIZED、 SPINNING_DISK_OPTIMIZED_HIGH_MEM、FLASH_SSD_OPTIMIZED(名称都很self-explanatory)。该参数容易忽略,但强烈建议设置,比起默认值均有不错的性能收益。若单个Slot的状态量达到GB级别,且托管内存充裕,设为SPINNING_DISK_OPTIMIZED_HIGH_MEM最佳。其他情况设为SPINNING_DISK_OPTIMIZED即可。

除了上述参数之外,原则上建议遵循RocksDB Wiki的忠告("No need to tune it unless you see an obvious performance problem"),不再手动调整RocksDB高级参数(如s.b.r.{block | writebuffer | compaction}.*),除非出现了托管内存机制无法解决的问题。笔者也将部分高级参数列出如下,供参考。

图8 RocksDB高级参数

注意划线的项会被托管内存机制覆盖掉。如果经过慎重思考,必须fine tune RocksDB,则需要将s.b.r.memory.managed设为false,同时用户要承担可能的OOM风险。

3.4 RocksDB监控 & 调优示例

在大状态作业正式上线之前,应打开一部分必要的RocksDB监控,观察是否有性能瓶颈。开启监控对状态读写性能有一定影响,一般建议如下6项:

s.b.r.metrics.{block-cache-capacity | block-cache-usage | cur-size-all-mem-tables | mem-table-flush-pending | num-running-flushes | num-running-compactions} = true

观察完毕并解决问题后,请务必关闭它们。

图9 示例作业RocksDB监控

上图是示例作业的部分RocksDB Metrics图表,比较正常。如果在稳定消费阶段,Flush和Compaction等重量级操作特别频繁,以至于图中的点连成线,一般就提示RocksDB遇到了瓶颈。但是托管内存(即BlockCache)占用100%是正常现象,基本不必担心。

作为参考,该作业的增量Checkpoint大小在15G左右,每日摄入数十亿条状态数据,设置参数为:t. m. managed. fraction = 0.25(实际分配托管内存3.6G),s. b. r. predefined- options = SPINNING_ DISK_ OPTIMIZED,s. b. r. checkpoint. transfer. thread. num = 8。表现良好。而调优前作业的t. m. managed. fraction是默认的0.1,并且还对RocksDB高级参数做了一些无谓的修改,性能表现不佳。

3.5 状态TTL

RocksDB的状态TTL需要借助CompactionFilter实现,如下图所示。

图10 状态TTL原理

用户调用State Ttl Config# cleanupIn Rocksdb Compact Filter (N)方法,就可以设定在访问状态N次后,更新CompactionFilter记录的时间戳。当SST执行Compaction操作时,会根据该时间戳检查状态键值对是否过期并删除掉。注意若访问状态非常频繁,N值应适当调大(默认仅为1000),防止影响Compaction性能。

3.6 状态缩放与最大并行度

当作业的并行度改变并从CP / SP恢复时,就会涉及状态缩放的问题。Flink内Keyed State数据以KeyGroup为单位组织,每个key经过两重Murmur Hash计算出它应该落在哪个KeyGroup中,同时每个Sub-task会分配到一个或多个KeyGroup。如下图所示,并行度变化只会影响KeyGroup的分配,可以将状态恢复的过程近似化为顺序读,提高效率。

图11 Keyed State的缩放

KeyGroup的数量与最大并行度相同,而最大并行度改变会导致作业无法从CP / SP恢复,所以要谨慎设定。如果用户没有显式设置,就会根据以下规则来推算:

128



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3