基于Flink的高吞吐精确一致性入湖实现

您所在的位置:网站首页 flink提交 基于Flink的高吞吐精确一致性入湖实现

基于Flink的高吞吐精确一致性入湖实现

2023-02-28 18:38| 来源: 网络整理| 查看: 265

1.概览

AnalyticDB MySQL高度兼容MySQL协议,支持毫秒级更新,亚秒级查询,可以对海量数据进行即时的多维分析透视和业务探索;而最新推出的AnalyticDB MySQL湖仓版(下文简称ADB湖仓版)支持低成本离线处理能力完成数据的清洗加工,同时提供高性能在线分析能力完成数据的洞察探索,真正做到数据湖的规模,数据库的体验。帮助企业降本增效,构建企业级数据分析平台。

APS(ADB Pipeline Service)简介:ADB湖仓版在深化自身湖仓能力建设的同时,还推出了APS(ADB Pipeline Service)数据通道组件,为客户提供实时数据流服务实现数据低成本、低延迟入湖入仓。本文以数据源SLS如何通过APS实现高速精确一致性入湖为例,介绍相关的挑战和解决方法。在数据通道的构建上,我们选择Flink作为基础引擎。Flink作为业界熟知的大数据处理框架,其流批一体架构有助于我们处理多种场景。而ADB的湖则建设在Hudi之上,Hudi作为成熟的数据湖底座已经被多家大型企业实际应用,ADB在其上也已积累多年经验,如今ADB湖仓版把湖和仓进行深度融合提供更加一体化的解决方案。

数据入湖的精确一致性挑战:在入湖通道中,有可能出现异常,升级,扩缩容等场景导致链路重启,触发部分已处理数据从源端重放,导致在目标端出现重复数据。为解决此问题一种做法是配置业务主键,并使用Hudi的Upsert能力来达到幂等写入目的。但SLS入湖的吞吐目标是每秒GB级别(某个业务的需求是4GB/s)且需要控制成本,Hudi Upsert难以满足需求,而SLS数据本身就具有Append特征,因此采用Hudi的Append Only模式写入实现高吞吐,并用其他机制保证数据不重不丢的精确一致性。

2.端到端精确一致性的问题和解决方案

流计算的一致性保障一般包含如下几种:

精确一致的语义是所有一致性语义中要求最高的,但流计算中的Exactly-Once一般是指内部状态的精确一致性(Exactly-Once State Consistency),而业务需要的是端到端Exactly Once,即当出现异常Failover场景时,最终目标端的数据需要与源端保持一致,数据不重不丢。

2.1 端到端精确一致性问题

要实现精确一致性,就必然要考虑Failover场景,即当系统宕机任务重启时,如何恢复到某个一致性状态。Flink之所以称为Stateful Stream Processing是其可通过Checkpoint机制保存状态到后端存储,并在重启时从后端存储恢复到某个一致性状态。但在状态的恢复中,其仅仅保证了Flink自身的状态一致,而在包含源端、Flink、目的端这样的完整系统中,仍可能产生数据丢失或重复,导致端到端出现不一致。下面用一个字符连接的例子说明数据重复问题。

下图是一个字符串连接的处理过程,处理逻辑是从源端读取一个个的字符,并对它们进行连接,每连接一个字符就向目标端输出一次,最终输出a, ab, abc ...多个不重复的字符串。

本例的场景中,Flink的Checkpoint保存了已完成的字符连接ab,以及对应的源端位点(Checkpoint箭头指向的位点)。Current则指向当前正在处理的位点,此时已经向目标端输出了a, ab, abc。当发生异常重启时,Flink从Checkpoint恢复自身状态,回退位点并重新处理字符c,并再次向目标端输出abc,导致abc出现重复。

在这个例子中,Filnk通过Checkpoint恢复状态,因此不会出现abb,或者abcc这种重复处理字符的情况,也不会出现ac种丢失字符的情况,保证了其自身的Exactly-Once。但是在目标端出现了两个重复的abc,因此没有保证端到端的Exactly-Once

2.2 端到端精确一致性方案

Flink本身是一种复杂的分布式系统,其内部包含Source、Sink等算子,同时还存在Slot等并行关系,这样的系统要实现精确一致一般会想到两阶段提交,实际上Flink的Checkpoint就是一种两阶段提交的实现。

而在端到端中,Flink和Hudi又组成了另一个分布式系统,这个分布式系统要实现精确一致性,就需要另一套两阶段提交的实现(我们这里不讨论SLS端,因为在本场景中Flink不会改变SLS的状态,只利用SLS的位点重放能力即可)。因此端到端中,是由Flink和Flink + Hudi两套两阶段提交来保证精确一致性(见下图)。

Flink的Checkpoint两阶段实现不再赘述,后文会重点介绍Flink + Hudi两阶段提交的实现,定义出哪些是Precommit阶段,哪些是Commit阶段,同时发生异常时如何从故障中恢复保证Flink和Hudi的状态一致。例如Flink已经完成Checkpoint,而Hudi尚未完成Commit,如何恢复到一致性状态,这些在后续章节介绍。

3.SLS入湖链路端到端精确一致实现

下面介绍SLS入湖链路精确一致性的实现。整体架构如下,Hudi的组件是部署在Flink JobManager和TaskManager上的。SLS作为数据源,由Flink读取处理后,写出到Hudi表。因为SLS是多shard存储,因此会由Flink的多个Source算子并行读取。数据读取后通过Sink算子调用Hudi Worker写出到Hudi表。当然实际的链路中还会有Repartition,热点打散等逻辑,这些在图中做了简化。Flink Checkpoint的后端存储,以及Hudi数据的存储都是放在OSS上。

3.1 SLS Source算子

如何实现Source算子消费SLS数据已有大量介绍,此处不在赘述。这里介绍SLS的两种消费模式:消费组模式普通消费模式以及他们的区别。

▷ 消费组模式

顾名思义,多个消费者可注册到同一个消费组,SLS会自动把Shard分配给这些消费者来读取。其优点是由SLS的消费组来管理负载均衡。如下图左,消费组中首先注册了两个消费者,因此SLS把6个Shard均匀分配给这2个消费者。当有新的消费者注册(如下图右),则SLS会自动均衡,把部分Shard从旧消费者迁移到新消费者,称为shard transfer。

这种模式的优点是自动均衡,且在SLS Shard分裂/合并时会自动分配消费者。但该模式在我们的场景中却会引起问题。为保证精确一致,我们把SLS各Shard的当前位点保存在Flink Checkpoint中,运行中也是由各Slot上的Source算子持有当前消费位点。如果发生shard transfer,如何保证旧Slot上的算子不再消费,同时把位点转移给新Slot,这引入了新的一致性问题。尤其是大规模系统有数百个SLS Shard和数百个Flink Slot的情况下,很可能出现部分Source比其他Source先注册到SLS导致shard transfer不可避免。

普通消费模式

这种模式就是调用SLS的SDK直接指定shard、offset来消费数据,而不是由SLS消费组进行分配,因此不会出现shard transfer。如下,因为Flink的Slot为3,因此可提前计算出每个消费者消费2个Shard并据此分配,即使Source 3尚未ready,也不会把Shard 5和6分配给Source 1和2。可以想象,为了负载均衡(例如某些TaskManager的负载过高时),仍然需要迁移shard,但此时迁移是我们主动触发的,状态更加可控,从而避免一致性问题。

3.2 Hudi Sink算子

下面介绍下Hudi提交的相关概念,以及如何与Flink配合实现两阶段提交和容错达到精确一致。

3.2.1 Hudi提交相关概念

时间轴和Instant

Hudi维护着一条Timeline,Instant是指某个时间点(Time)发起的对表的操作(Action)及表所处的状态(State)的集合。一个Instant可以理解为一个数据版本,Action可能是对表的Commit,Rollback或者Clean等操作,这些操作由Hudi保证了其原子性,因此Hudi的Instant实际类似于数据库中的事务和版本的概念。在图中我们用Start Transaction,Write Data,Commit这种类似数据库事务的方式来表达某个Instant的执行过程。Instant中,部分Action的含义如下:

Commit:将记录原子写入数据集Rollback :当Commit不成功时进行回滚,其会删除在写入中产生的脏文件Clean :删除数据集中不再需要的旧版本和文件

Instant State一共有三种状态:

Requested:操作已被计划但未被执行,可以理解为Start TransactionInflight:操作正在进行,可理解为Write DataCompleted:操作完成,可理解为Commit

Instant的Time、Action和State都在元数据文件中描述,下图表示了时间轴上两个先后的Instant。Instant 1的.hoodie目录下的元数据文件描述了Instant的开始时间是2022-10-17 16:05:00,Action是Commit,State可以看到已完成提交(有20221017160500.commit文件),在表的分区目录下则是该Instant对应的parquet数据文件。而Instant 2则可以看到发生在第二天,且Action正在执行尚未提交。

Hudi提交过程

Hudi提交过程可以用下图理解。Hudi中存在两种角色,Coordinator负责发起Instant和完成提交,Worker负责写入数据。

当开启一个事务时,Coordinator会分配一个Instant并传递给所有worker;Worker开始写入数据;开始提交时,Coordinator发送commit给各Worker。各Worker收到提交命令,flush data到持久化存储,并反馈自己的状态给Coordinator。Coordinator确认各Worker commit完成,然后在.hoodie目录中写commit文件完成全局提交。

3.2.2 Flink + Hudi的两阶段提交

了解了Hudi的写入和提交过程,它如何与Flink配合完成数据的写入和提交就可以用下图表达了。

开启一个Hudi Instant;由Filnk Sink发送数据给Hudi Worker写出;发生Flink Checkpoint时,则通过Sink算子通知Worker flush数据,同时持久化operator-state(operator-state属于Flink checkpoint框架的一部分,持久化Hudi Worker所处的Instant等信息);当Flink完成Checkpoint的持久化,则通过notifyCheckpointComplete机制通知Hudi Coordinator提交该Instant。Hudi Coordinator此时完成最终提交,写commit文件,数据对外可见;结束Instant后,会立即开启一个新的Instant,重启上述循环。3.2.3 Flink + Hudi两阶段提交的容错处理

实际的提交可简化为如上的流程。从图中可见,1到3是Flink的Checkpoint逻辑,如果异常在这些步骤上发生,则认为Checkpoint失败,触发Job重启,从上一次Checkpoint恢复,相当于两阶段提交的Precommit阶段失败,事务回滚。当3到4之间发生异常,则会出现Flink和Hudi状态不一致。此时Flink认为Checkpoint已结束,而Hudi实际尚未提交。如果对此情况不做处理,则发生了数据丢失,因为Flink Checkpoint完毕后,SLS位点已经前移,而这部分数据在Hudi上并未完成提交,因此容错的重点是如何处理此阶段引起的一致性问题。

解决方法是Flink Job重启并从Checkpoint恢复时,发现Hudi最新的Instant有未提交的写入,需要保证执行Recommit。Recommit的流程如下图所示。

之前已提到Hudi Worker在Checkpoint时除了flush data,还持久化了一个operator-state,在这里记录了Worker当时所处的Instant信息。Job从Checkpoint恢复时,Sink算子会读取operator-state,Hudi Worker从中恢复持久化的Instant信息;Hudi worker汇报给自己的Instant给Coordinator;Hudi Coordinator会从Instant Timeline中获取最新的Instant信息,并接收所有Worker的汇报;如果Worker汇报的Instant与Timeline中最新的一样,且该Instant尚未提交,则触发Recommit。如果Worker汇报的Instant与最新的不同,则认为上一次Instant执行失败,这份数据对用户不可见,回滚掉即可。

可以想像下是否会存在重启时,部分Hudi worker在最新的Instant,而部分worker在旧的Instant的情况?答案是不会,因为Flink的Checkpoint就是相当于两阶段提交的Precommit阶段,如果Checkpoint完成则说明Hudi Precommit完成,所有Worker都处于最新Instant。如果Checkpoint失败,则重启时会回到上一个Checkpoint,此时Hudi worker所处的状态也是一致的,全部都回退到旧Instant。

4.总结

在数据入湖异常时的Failover处理中,Source通过Checkpoint中持久化的SLS位点,不会重放已处理的数据,保证数据不重,Sink通过Flink和Hudi配合实现的两阶段提交和Recommit机制,保证数据不丢,最终实现Exactly-Once。经过实测这套机制对性能的影响约在3% ~ 5%,以极小的代价保证精确一致性的情况下,实现了高吞吐实时入湖。在某个海量日志入湖项目中,日常吞吐达到3GB/s,峰值吞吐达到5GB/s,数据通道稳定运行,并配合ADB湖仓版的离在线一体化引擎,实现了用户的数据实时入湖,离在线一体化分析需求。

除了精确一致性外,为实现高吞吐写和查,数据通道中还有自动热点打散,小文件合并等诸多挑战,将在后续文章中进行介绍。

AnalyticDB MySQL湖仓版已正式上线商用,对于低成本离线处理ETL有需求,同时又需要使用高性能在线分析支撑BI报表/交互式查询/APP应用的用户,欢迎购买和体验!另外对湖仓版感兴趣的客户也依然可以填写问卷来进行试用,点击链接填写问卷。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3