Flink运行时之流处理程序生成流图

您所在的位置:网站首页 程序流程图转化为流图的规则 Flink运行时之流处理程序生成流图

Flink运行时之流处理程序生成流图

2024-07-15 14:08| 来源: 网络整理| 查看: 265

流处理程序生成流图

DataStream API所编写的流处理应用程序在生成作业图(JobGraph)并提交给JobManager之前,会预先生成流图(StreamGraph)。

什么是流图

流图(StreamGraph)是表示流处理程序拓扑的数据结构,它封装了生成作业图(JobGraph)的必要信息。它的类继承关系如下图所示:

StreamGraph-class-diagram

当你基于StreamGraph的继承链向上追溯,会发现它实现了FlinkPlan接口。

Flink效仿了传统的关系型数据库在执行SQL时生成执行计划并对其进行优化的思路。FlinkPlan是Flink生成执行计划的基接口,定义在Flink优化器模块中,流处理程序对应的计划是StreamingPlan,但是当前针对流处理程序没有进行优化,因此这个类可看作是一个预留设计。

一个简单的实现“word count”的流处理程序,其StreamGraph的形象化表示如下图:

Flink-StreamGraph

Flink官方提供了一个计划可视化器来图形化执行计划,该计划可视化器基于Flink API所生成的计划的JSON格式表示绘制图形。但是需要注意的是,计划的JSON形式表示缺失了很多属性以及部分节点(比如虚拟节点等);

上面的图是由“节点”和“边”组成的。节点在Flink中对应的数据结构是StreamNode,而边对应的数据结构是StreamEdge,StreamNode和StreamEdge之间有着双向的依赖关系。StreamEdge包含了其连接的源节点sourceVertex和目的节点targetVertex:

StreamEdge-structure

而StreamNode中包含了与其连接的入边集合inEdges和出边集合outEdges:

StreamNode-structure

StreamEdge和StreamNode都有唯一的编号进行标识,但是各自编号的生成规则并不相同。

StreamNode的编号id的生成是通过调用StreamTransformation的静态方法getNewNodeId获得的,其实现是一个静态计数器:

protected static Integer idCounter = 0; public static int getNewNodeId() { idCounter++; return idCounter; }

StreamEdge的编号edgeId是字符串类型,其生成的规则为:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;

它是由多个段连接起来的,语义的文字表述如下:

源顶点_目的顶点_输入类型数量_输出选择器的名称_输出分区器

edgeId除了用来实现StreamEdge的hashCode及equals方法之外并没有其他实际意义。

StreamNode是表示流处理中算子的数据结构,source和sink在StreamGraph中也是以StreamNode表示,它们也是一种算子,只是因为它们是流的输入和输出因而有特定的称呼。

StreamNode除了存储了输入端和输出端的StreamEdge集合,还封装了算子的其他关键属性,比如其并行度、分区的键信息、输入与输出类型的序列化器等。

从直观上来看你已经知道了StreamNode和StreamEdge是StreamGraph的重要组成部分,但是为了生成JobGraph,StreamGraph很显然必须得包含更多的内容。总结一下,StreamGraph中包含的属性可分为三大类:

流处理程序的执行配置;流处理程序拓扑中包含的节点和边的信息;迭代相关的信息;

当然围绕这些属性的方法非常多,比如添加边和节点,创建迭代的source/sink等。

其中的一个关键方法getJobGraph将用于生成JobGraph:

public JobGraph getJobGraph() { if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) { throw new UnsupportedOperationException( "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this); return jobgraphGenerator.createJobGraph(); }

从上面的代码段也可见,当流处理程序中包含迭代逻辑时,检查点功能暂时不被支持,在异常信息中Flink阐述了缘由:在迭代作业中无法保证“恰好一次”的语义。

流处理程序依赖StreamingJobGraphGenerator来生成JobGraph,至于如何生成,后续会进行剖析。

生成流图的源码分析

了解了什么是流图(StreamGraph)之后,我们来分析它是如何生成的。流图的生成是通过StreamExecutionEnvironment的getStreamGraph实例方法触发的:

public StreamGraph getStreamGraph() { if (transformations.size()


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3