JRC Flink流作业调优指南

您所在的位置:网站首页 flink每天零点状态清空 JRC Flink流作业调优指南

JRC Flink流作业调优指南

2023-03-20 08:14| 来源: 网络整理| 查看: 265

本文正在参加「金石计划」

作者:京东物流 康琪

本文综合Apache Flink原理与京东实时计算平台(JRC)的背景,详细讲述了大规模Flink流作业的调优方法。通过阅读本文,读者可了解Flink流作业的通用调优措施,并应用于生产环境。

写在前面

Apache Flink作为Google Dataflow Model的工业级实现,经过多年的发展,如今已经成为流式计算开源领域的事实标准。它具有高吞吐、低时延、原生流批一体、高一致性、高可用性、高伸缩性的特征,同时提供丰富的层级化API、时间窗口、状态化计算等语义,方便用户快速入门实时开发,构建实时计算体系。

古语有云,工欲善其事,必先利其器。要想让大规模、大流量的Flink作业高效运行,就必然要进行调优,并且理解其背后的原理。本文是笔者根据过往经验以及调优实践,结合京东实时计算平台(JRC)背景产出的面向专业人员的Flink流作业调优指南。主要包含以下四个方面:

TaskManager内存模型调优 网络栈调优 RocksDB与状态调优 其他调优项

本文基于Flink 1.12版本。阅读之前,建议读者对Flink基础组件、编程模型和运行时有较深入的了解。

*01 TaskManager内存模型调优 1.1 TaskManager内存模型与参数

目前的Flink TaskManager内存模型是1.10版本确定下来的,官方文档中给出的图示如下。在高版本Flink的Web UI中,也可以看到这张图。

图1 TaskManager内存模型

下面来看图说话,分区域给出比官方文档详细一些的介绍。t.m.即为taskmanager. memory.前缀的缩写。

1.2 平台特定参数

除了TaskManager内存模型相关的参数之外,还有一些平台提供的其他参数,列举如下。

1.3 TM/平台参数与JVM的关系

上述参数与TaskManager JVM本身的参数有如下的对应关系:

-Xms | -Xmx → t. m. framework. heap. size + t. m. task. heap. size -Xmn → -Xmx * apus. taskmanager. heap. newsize. ratio -XX: Max Direct Memory Size → t. m. framework. off- heap. size + t. m. task. off- heap. size + $network -XX: Max Metaspace Size → t. m. jvm- metaspace. size

另外,还可以通过env.java.opts.{jobmanager | taskmanager}配置项来分别设定JM和TM JVM的附加参数。

1.4 内存分配示例

下面以在生产环境某作业中运行的8C / 16G TaskManager为例,根据以上规则,手动计算各个内存分区的配额。注意有部分参数未采用默认值。

t.m.process.size = 16384 t.m.flink.size = t.m.process.size * apus.memory.incontainer.available.ratio = 16384 * 0.9 = 14745.6 t.m.jvm-metaspace.size = [t.m.process.size - t.m.flink.size] * apus.metaspace.incutoff.ratio = [16384 - 14745.6] * 0.25 = 409.6 $overhead = MIN{t.m.process.size * t.m.jvm-overhead-fraction, t.m.jvm-overhead.max} = MIN{16384 * 0.1, 1024} = 1024 $network = MIN{t.m.flink.size * t.m.network.fraction, t.m.network.max} = MIN{14745.6 * 0.3, 5120} = 4423.68 $managed = t.m.flink.size * t.m.managed.fraction = 14745.6 * 0.25 = 3686.4 t.m.task.off-heap.size = t.m.flink.size * apus.taskmanager.memory.task.off-heap.fraction = 14745.6 * 0.01 = 147.4 t.m.task.heap.size = t.m.flink.size - $network - $managed - t.m.task.off-heap.size - t.m.framework.heap.size - t.m.framework.off-heap.size = 14745.6 - 4423.68 - 3686.4 - 147.4 - 128 - 128 = 6232.12 复制代码

与Web UI中展示的内存配额做比对,可发现完全吻合。

图2 Web UI展示的内存分配情况

1.5 调优概览

理解TaskManager内存模型是开展调优的大前提,进行调优的宗旨就是:合理分配,避免浪费,保证性能。下面先对比较容易出现问题的三块区域做简要的解说。

1.关于任务堆外内存

平台方的解释是有些用户的作业需要这部分内存,但从Flink Runtime的角度讲,主要是批作业(如Sort-Merge Shuffle过程)会积极地使用它。相对地,流作业很少涉及这一部分,除非用户代码或用户引用的第三方库直接操作了DirectByteBuffer或Unsafe之类。所以一般可以优先保证堆内存,即尝试将 apus.t.m.task.off-heap.fraction再调小一些(如0.05),再观察作业运行是否正常。

2.关于托管内存

如果使用RocksDB状态后端,且状态数据量较大或读写较频繁,建议适当增加t.m.managed.fraction,如0.2~0.5,可配合RocksDB监控决定。如果不使用RocksDB状态后端,可设为0,因为其他状态后端下的本地状态会存在TaskManager堆内存中。后文会详细讲解RocksDB相关的调优项。

3.关于网络缓存

需要特别注意的是,网络缓存的占用量与并行度和作业拓扑有关,而与实际网络流量关系不大,所以不能简单地以作业的数据量来设置这一区域。粗略地讲,对简单拓扑,建议以默认值启动作业,再观察该区域的利用情况并进行调整;对复杂拓扑,建议先适当调大t.m.network.fraction和max,保证不出现IOException: Insufficient number of network buffers异常,然后再做调整。另外,请一定不要把t.m.network.min和max设成相等的值,这样会直接忽略fraction,而这种直接的设定往往并不科学。下一节就来详细讲解Flink网络栈的调优。

02 网络栈调优 2.1 网络栈和网络缓存

图3 Flink网络栈

Flink的网络栈构建在Netty的基础之上。如上图所示,每个TaskManager既可以是Server(发送端)也可以是Client(接收端),并且它们之间的TCP连接会被复用,以减少资源消耗。

图中的小色块就是网络缓存(NetworkBuffer),它是数据传输的最基本单位,以直接内存的形式分配,承载序列化的StreamRecord数据,且一个Buffer的大小就等于一个MemorySegment的大小(t.m.segment-size,默认32KB)。TM中的每个Sub-task都会创建网络缓存池(NetworkBufferPool),用于分配和回收Buffer。下面讲解一下网络缓存的分配规则。

2.2 网络缓存分配规则

Flink流作业的执行计划用三层DAG来表示,即:StreamGraph(逻辑计划)→ JobGraph(优化的逻辑计划)→ ExecutionGraph(物理计划)。当ExecutionGraph真正被调度到TaskManager上面执行时,形成的是如下图所示的结构。

图4 Flink物理执行图结构

每个Sub-task都有一套用于数据交换的组件,输出侧称为ResultPartition(RP),输入侧称为InputGate(IG)。另外,它们还会根据并行度和上下游的DistributionPattern(POINTWISE或ALL_TO_ALL)划分为子块,分别称为ResultSubpartition(RS)和InputChannel(IC)。注意上下游RS和IC的比例是严格1:1的。网络缓存就是在ResultPartition和InputGate级别分配的,具体的分配规则是:

#Buffer-RP = #RS + 1 && #Buffer-RS


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3