Flink原理与实现:详解Flink中的状态管理

您所在的位置:网站首页 flink状态的使用 Flink原理与实现:详解Flink中的状态管理

Flink原理与实现:详解Flink中的状态管理

2022-05-09 09:03| 来源: 网络整理| 查看: 265

【本文转自Flink原理与实现:详解Flink中的状态管理】

Flink原理与实现系列文章 :

Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraphFlink原理与实现:如何生成ExecutionGraph及物理执行图Flink原理与实现:Operator Chain原理

上面Flink原理与实现的文章中,有引用word count的例子,但是都没有包含状态管理。也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。

首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。

Flink通过定期地做checkpoint来实现容错和恢复。

State Keyed State和Operator State

Flink中包含两种基础的状态:Keyed State和Operator State。

Keyed State

顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。

Operator State

与Keyed State不同,Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能会有很多个key,从而对应多个keyed state。

举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

原始状态和Flink托管状态 (Raw and Managed State)

Keyed State和Operator State,可以以两种形式存在:原始状态和托管状态。

托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。

下面是Flink整个状态框架的类图,还是比较复杂的,可以先扫一眼,看到后面再回过来看:

image.png

通过框架提供的接口,我们来更新和管理状态的值。

而raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

下文中所提到的状态,如果没有特殊说明,均为托管状态。

使用Keyed State

首先看一下Keyed State下,我们可以用哪些原子状态:

ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在Flink未来版本中被删除)。MapState:即状态值为一个map。用户通过put或putAll方法添加元素。

以上所有的状态类型,都有一个clear方法,可以清除当前key对应的状态。

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄(state handle)。

接下来看下,我们如何得到这个状态句柄。Flink通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息。与上面的状态对应,从StateDescriptor派生了ValueStateDescriptor, ListStateDescriptor等descriptor。

具体如下:

ValueState getState(ValueStateDescriptor)ReducingState getReducingState(ReducingStateDescriptor)ListState getListState(ListStateDescriptor)FoldingState getFoldingState(FoldingStateDescriptor)MapState getMapState(MapStateDescriptor)

接下来我们看一下创建和使用ValueState的例子:

public class CountWindowAverage extends RichFlatMapFunction { /** * ValueState状态句柄. 第一个值为count,第二个值为sum。 */ private transient ValueState sum; @Override public void flatMap(Tuple2 input, Collector out) throws Exception { // 获取当前状态值 Tuple2 currentSum = sum.value(); // 更新 currentSum.f0 += 1; currentSum.f1 += input.f1; // 更新状态值 sum.update(currentSum); // 如果count >=2 清空状态值,重新计算 if (currentSum.f0 >= 2) { out.collect(new Tuple2(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor descriptor = new ValueStateDescriptor( "average", // 状态名称 TypeInformation.of(new TypeHint() {}), // 状态类型 Tuple2.of(0L, 0L)); // 状态默认值 sum = getRuntimeContext().getState(descriptor); } } // ... env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(0) .flatMap(new CountWindowAverage()) .print(); // the printed output will be (1,4) and (1,5)

由于状态需要从RuntimeContext中创建和获取,因此如果要使用状态,必须使用RichFunction。普通的Function是无状态的。

KeyedStream上的scala api则提供了一些语法糖,让创建和使用状态更加方便:

val stream: DataStream[(String, Int)] = ... val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c + in._2) ) case None => ( (in._1, 0), Some(in._2) ) }) Inside Keyed State

上面以Keyed State为例讲了如何使用状态,接下来我们从代码层面分析一下,框架在内部做了什么事情。

先看下上面例子中open方法中获取状态句柄的代码:

sum = getRuntimeContext().getState(descriptor);

它调用了RichFlatMapFunction.getRuntimeContext().getState方法,最终会调用StreamingRuntimeContext.getState方法:

public ValueState getState(ValueStateDescriptor stateProperties) { KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties); stateProperties.initializeSerializerUnlessSet(getExecutionConfig()); return keyedStateStore.getState(stateProperties); }

checkPreconditionsAndGetKeyedStateStore方法中:

KeyedStateStore keyedStateStore = operator.getKeyedStateStore(); return keyedStateStore;

即返回了AbstractStreamOperator.keyedStateStore变量。这个变量的初始化在AbstractStreamOperator.initState方法中:

private void initKeyedState() { try { TypeSerializer keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); // create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer if (null != keySerializer) { KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(), container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(), container.getEnvironment().getTaskInfo().getIndexOfThisSubtask()); long estimatedStateSizeInMB = config.getStateSize(); this.keyedStateBackend = container.createKeyedStateBackend( keySerializer, // The maximum parallelism == number of key group container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(), subTaskKeyGroupRange, estimatedStateSizeInMB); this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig()); } // ... }

它先调用StreamTask.createKeyedStateBackend方法创建stateBackend,然后将stateBackend传入DefaultKeyedStateStore。

StreamTask.createKeyedStateBackend方法通过它内部的stateBackend来创建keyed statebackend:

backend = stateBackend.createKeyedStateBackend( getEnvironment(), getEnvironment().getJobID(), operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, estimatedStateSizeInMB, getEnvironment().getTaskKvStateRegistry());

看一下statebackend的初始化,在StreamTask.createStateBackend方法中,这个方法会根据配置项state.backend的值创建backend,其中内置的backend有jobmanager, filesystem, rocksdb。

jobmanager的state backend会把状态存储在job manager的内存中。filesystem会把状态存在文件系统中,有可能是本地文件系统,也有可能是HDFS、S3等分布式文件系统。rocksdb会把状态存在rocksdb中。

所以可以看到,创建了state backend之后,创建keyed stated backend,实际上就是调用具体的state backend来创建。我们以filesystem为例,实际就是FsStateBackend.createKeyedStateBackend方法,这个方法也很简单,直接返回了HeapKeyedStateBackend对象。

先不展开说HeapKeyedStateBackend类,我们返回去看创建keyed state,最终返回的是DefaultKeyedStateStore对象,它的getState, getListState, getReducingState等方法,都是对底层keyed state backend的一层封装,keyedStateBackend.getPartitionedState来返回具体的state handle(DefaultKeyedStateStore.getPartitionedState方法)。

这个方法实际调用了AbstractKeyedStateBackend.getPartitionedState方法,HeapKeyedStateBackend和RocksDBKeyedStateBackend都从这个基类派生。

这个类有一个成员变量:

private final HashMap


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3