深入解析 Kafka Exactly Once 语义设计 & 实现

您所在的位置:网站首页 kafka生产者发送超时 深入解析 Kafka Exactly Once 语义设计 & 实现

深入解析 Kafka Exactly Once 语义设计 & 实现

2023-10-24 21:19| 来源: 网络整理| 查看: 265

本篇文章主要介绍 Kafka 如何在流计算场景下保证端到端的 Exactly Once 语义,通过其架构上的设计以及源码分析帮助读者理解背后的实现原理。

什么是 Exactly-Once?

消息的投递语义主要分为三种:

At Most Once: 消息投递至多一次,可能会丢但不会出现重复。At Least Once: 消息投递至少一次,可能会出现重复但不会丢。Exactly Once: 消息投递正好一次,不会出现重复也不会丢。

Kafka 实现的主要是针对流计算场景下的 Exactly Once 能力,当中有个重要的限制条件,数据必须取自 Kafka 且计算结果也必须保存到 Kafka。如果流数据的状态存储依赖的是外部系统(常见的传统消息队列使用场景),则可能无法在系统出现故障时保证 Exactly Once。比如消费端消费一批数据后,在尚未提交消费位点的情况下崩溃重启,此时可能会消费到重复的消息(当然客户端可以自己实现幂等),从而造成错误的计算结果。

所以注意,Kafka 并未实现一般意义上消息队列服务水平中的 Exactly Once 语义(目前市面上也并未有其他消息产品实现),以下所提到的 Exactly Once 也均是基于流计算场景出发。

为什么流计算场景需要 Exactly-Once 能力?

Kafka 实现 Exactly-Once 语义的主要出发点是为了应对流计算的 “Consume-Process-Produce” 任务,其主要流程如下:

流处理应用(e.g. Kafka Streams)从源主题中消费到一条消息 A。触发函数对消息 A 进行处理并更新其状态。生产一至多条消息 B1 - Bn 至目标主题。等待来自目标主题所属 Broker 的响应。向源主题主动提交消息A的位点,表示该消息已被处理,并等待响应。

可以看到在 “Consume-Process-Produce” 任务中,流处理应用会同时作为消费者和生产者对流数据进行处理,而 At Least Once 的语义可能会造成重复消息的出现,比如以下两种常见场景:

当流处理应用成功发送消息并等待来自目标主题所属 Broker 的响应时,本该抵达的 ACK 因网络问题而丢失,导致应用认为消息发送失败而进行重试,发送成功后服务端会出现重复消息。

流处理应用发送消息至目标主题并成功获得响应,但在即将把位点(消费进度)提交至源主题前应用发生崩溃。应用重启后,重新从源主题消费到先前已处理过的消息,再次发送该条消息成功后,将会导致服务端出现重复消息。

根据以上示例我们可以得知,重复消息在 At-least-once随时都有可能出现,由于在流计算场景中,我们不会希望出现数据被重复处理的情况(比如网站浏览数被多算或少算一个),所以如何能够去除重复数据以达成精确一次(Exactly Once)投递至关重要。

Kafka 如何对消息进行去重?

要对消息进行去重,最直接的办法便是对每条消息进行唯一编号并在服务端对这些编号进行维护,每当有新消息便遍历检查是否存在重复的消息编号,如果有则拒绝写入。然而,这样的实现在消息量较大时,效率非常低。

一种更高效的方案是对每个生产者进行唯一编号(Producer ID, PID)并让其维护消息序列号,该序列号会随着消息的发送递增。这种方式同样可对消息进行唯一标识,且服务端不再需要对所有消息编号进行存储。取而代之的是,它只需维护各 PID 及其对应的最高消息序列号映射,以验证新消息的序列号是否有序和没有重复。

另一个问题是生产者应以全局(集群)还是分区维度维护其序列号?试想一个客户端可能发消息至多个分区,如果以全局的方式维护序列号,对客户端来说实现较为简单,但服务端就需较高成本来判断消息是否有序。如果以分区维度维护,则能够让服务端更高效地进行消息的有序验证(合法序列号 = 生产者已发送至该分区的消息最高序列号 + 1)。

总的来说,利用 PID + 以分区维度维护的消息序列号,服务端便可对接收到的消息进行如下判断:

如果消息序列号小于 PID 在当前分区所对应的最高序列号,说明消息出现重复,放弃写入至日志。如果消息序列号大于 PID 在当前分区所对应的最高序列号 + 1,说明消息出现乱序,抛出异常信息并拒绝写入。

补充一点,针对每个 Topic-Partition,Broker 会在其内存中维护每个 PID 及序列号(成功写入日志的消息)的映射关系,然而这样在 Broker 重启时,将会需要读取所有日志来恢复该状态。因此, Kafka 引入 PID Snapshot 机制,定期针对该映射关系和日志位点做快照,以加速状态恢复效率。

 

然而,这样就能百分百保证消息有序了吗?

根据 Kafka 的设置,PID 对每个分区可同时发送最多5个未 ACK 的请求(服务端规定最多只能缓存每个 PID 在每个 Topic-Partition 最近的 5 个请求)。试想下如果我们一次发送 6 个请求,除了第一条其余都返回 ACK 成功。此时客户端会对第一条消息进行重试,但由于服务端仅能缓存 5 条最近的消息的缘故,所以其无法对重试的第一条消息的序列号判重。

为此,Kafka 实现了以下机制来对有序性做保证:

生产者不可一次发送超过服务端允许的未 ACK 请求数(5 个)。当服务端发现消息序列号大于预期,代表消息出现乱序,则将后续的乱序请求一并拒绝并返回异常进行重试(e.g. 第一个请求发送失败/超时,则后续同时发送的其他请求也需跟着重试)。当出现重试请求时,会按照序列号大小将请求重新放到发送队列中的正确位置。客户端发送重试请求前,会检查当前的重试请求是否是之前未发送成功批次的首个请求,如果不是,则需等待首个请求被重新加入到发送队列中(等于最多允许发送的未 ACK 请求数动态减少到 1)。

这便是 Kafka 为解决因重试导致的消息重复问题所引入的幂等生产者(Idempotent Producer)基本思想,其主要针对以下场景:

服务端接收消息并保存后,发送 ACK 失败,生产者认为该消息发送失败而进行重试导致消息出现重复。消息 A 发送失败,消息 B 发送成功,消息 A 经过重试后发送成功所产生的消息乱序问题。

然而,幂等生产者仅能保证在单会话 & 单分区的 Exactly Once 语义,它仍然没有解决在流计算场景下的一些常见问题:

流处理应用在收到消息 ACK 前重启,此时服务端无法分辨前后请求来自同个生产者,给重启后的应用分配了新的 PID,故无法正确判断其对应的消息序列号,进而导致无法在消息重投情况下做去重。流处理应用在消费一批消息并处理后,可能会将计算结果发往不同分区,我们不希望出现部分发送成功,部分失败的状态。流处理应用成功发送消息至目标 Topic 且返回 ACK 成功,但在向源 Topic 提交位点前应用发生崩溃而重启。这种情况下,应用重启后将会消费到重复消息,导致计算处理结果出现错误。

综上所述,可以总结出幂等生产者在保证 Exactly Once 机制上的两大缺陷:1. 应用崩溃重启后,无法保证新会话发送的消息依旧能够保持幂等;2. 无法保证多分区/多个读写操作的原子性。为此,Kafka 引入了事务性机制来加强对 Exactly Once 语义的保证,接下来我们就来看看 Kafka 是如何通过事务来一一解决这些问题的。

Kafka 如何保证跨会话的幂等性写入?

由于幂等生产者的的 PID 会随着每次应用的重启而更新,导致跨会话的情况下,服务端无法定位生产者重启前所发送的消息最高序列号为消息做去重。

为此,Kafka 引入了稳定的唯一 ID —— Transactional ID(事务 ID)来标识一组事务的操作,在跨会话的情况下,只要使用相同的事务 ID,即可接续之前的事务状态继续操作(e.g. 故障恢复后主动回滚上次未完成的事务)。而不同于 PID 的是,事务 ID 是通过用户提供的,而 PID 则是由服务端进行分配。

这里可能会出现问题,如果当前多个生产者使用了相同的事务 ID(e.g. 生产者假死后复活,然而已有新的生产者接续之前未完成的事务),是否会发生脑裂问题(多个生产者可能针对同一事务进行操作)呢?

为此,Kafka 实现了个防护(fencing)机制,引入 epoch 的概念来隔离掉僵尸生产者,服务端在生产者发送事务初始化请求时,便会记录该事务 ID 所对应的最新 epoch 并连同 PID 一同返回给生产者,如果有来自相同事务 ID 的初始化请求便将其对应的 epoch 加 1。这样便能够隔离掉来自较早创建(epoch 较小)的生产者请求。

Transactional ID + 生产者 epoch 解决了幂等生产者无法保证跨会话幂等写入的问题,但同时还需配合服务端针对事务性机制的实现,才可确保新会话启动后,任何先前未完成的操作都已提交/回滚,处于一个“干净”的状态。这就涉及到我们再来要讨论的 Kafka 在多分区写入或有多个读写操作的情况下,如何保证事务的原子性。

Kafka 如何保证多分区写入/多个读写操作的原子性?

Kafka 为了解决多分区写入/多个读写操作的原子性问题,在服务端 Broker 间引入了个关键新角色 —— Transaction Coordinator(事务协调者)。事务协调者主要的职责在于管理事务相关的元数据(e.g. 事务 ID、PID、epoch 等)。

当事务开始后,每当生产者要向一个新 Topic-Partition 发送消息,便会向协调者同步自己要操作哪个 Topic-Partition,之后才会正式向对应分区发消息,待事务执行完后再告诉协调者事务的执行结果。

让我们继续分析,一个事务内所发送的消息具体是如何被提交/回滚的?我们知道 Kafka 有个分区 Leader 的角色(负责读/写该分区消息的 Broker),一个事务内的消息都将被发往各分区 Leader。如果事务完成后,由生产者分别向各个分区 Leader 告知事务消息可提交/回滚,这样对于客户端的压力是否有些大?

如同前面所说,生产者会在正式向各分区 Leader 发送消息前,会同步自己要将消息发往哪个 Topic-Partition 给协调者,事务执行完后再告知协调者事务的执行结果,这说明协调者掌握了事务所涉及的关键信息,并具备了操作整个事务的主动权。因此,当协调者拿到事务的执行结果后,便可由它来负责通知各分区 Leader 要对事务消息进行提交/回滚。

接下来,同一事务的消息在还未被提交/回滚前会以什么形式待在服务端呢?如果放在内存肯定是不现实的,一个长事务可能会包含很多消息,造成内存压力过大,所以必定会需要对这些未提交/回滚的消息进行持久化。但如果将事务消息存放到磁盘,进行回滚操作的过程又可能会涉及到多次磁盘的读写,效率极低。

为此,Kafka 引入了新的消息类型 —— Control Messages(控制消息),简单来说就是事务执行结果的标记,协调者会将控制消息发往各分区 Leader 来通知当前事务所涉及的消息应该被提交/回滚。控制消息会和普通消息一同持久化,客户端则会连同控制消息一块消费,并通过它得到事务的执行结果。

到这里,我们可以知道事务协调者这个角色的重要性,然而协调者也有可能出现挂掉的情况,此时协调者的角色可能会随着服务端的 HA 机制(Kafka 多副本机制)转移到其他 Broker,这样要如何保证事务状态信息不会在转移过后丢失呢?

为此,Kafka 引入了个新的内部 Topic (不会被客户端消费)—— __transaction_state,该 Topic 是个 Compacted Topic(简单来说就是 KV 存储),协调者所有的元数据信息都会被持久化到其对应的日志。加上 Kafka 本身的多副本机制,保证了协调者在故障恢复后能够正确恢复状态。

到这里就结束了吗?别忘了我们还需消费端的配合才能够达成端到端的 Exactly Once。

在事务尚未完成的情况下,消息照理来说不应被客户端读取到,因此其中一个可行的办法是在客户端先缓存尚未提交/回滚的事务消息,待消费到控制消息后再判断是否允许客户端获取到数据。然而,这对客户端内存的压力较大,不太合适。

如果是通过服务端来保证消费者能够获取到所有已提交的数据呢?听起来似乎更可行些,但一长事务的日志头尾位点涵盖范围可能会包含其他短事务和其他类型的消息,该长事务提交后,服务端就需要读取出完整的日志段并过滤掉不属于它的数据,或者通过磁盘随机读访问长事务所涉及的日志段。这样对读取性能影响是较大的。

因此,Kafka 选择了相对保守的方案,引入 Last Stable Offset(LSO)的概念在客户端实现了 read_committed(读已提交)策略,顾名思义,在该位点之前的消息都是稳定的、已完成的事务或者其他类型t的消息,它代表的是第一个未完成事务的首消息位点,当客户端设置 read_committed 模式后,服务端便只会读取到 LSO 之前的消息。

LSO 的缺点在于该位点后可能存在其他已提交的事务或者其他类型的消息,但受限于它们的位点之前有某个尚在进行中的事务(e.g. 未提交的长事务所涉及的日志段有可能包含数个已提交的短事务),所以无法被返回给客户端,但也因此减轻了客户端的压力(降低 OOM 风险)和确保了服务端的读取效率。

除了利用 LSO 过滤掉未完成的事务,是否有办法高效地直接过滤掉已回滚的事务?虽然有控制消息可以判断事务是否回滚,但如果每次都需要在客户端缓存数据,等消费到控制消息再确认是否丢弃,是否还是效率低了些?

Kafka 为了再进一步减轻客户端的缓存压力,服务端会记录每个分区的已回滚的事务所涉及的日志起止位点,并持久化到后缀名为 .txnindex 的文件中(为了快速恢复)。这样客户端在拉取数据时,服务端会根据其拉取的位点所涉及到的回滚事务,返回一个已回滚事务集合,供客户端提早过滤掉已回滚的消息。

Kafka 如何在流计算场景实现 Exactly-once 机制?

到这里可以大致总结出,Kafka 通过引入幂等生产者 + 事务性机制解决了跨会话的幂等写入和跨分区/多个读写操作的原子性。接下来我们综合这些概念,来看看 Kafka 在流计算场景实现端到端 Exactly-once 的主要流程。

整体流程

我们先介绍下大致的运行流程,再深入各个步骤的细节实现:

事务初始化生产者寻找事务协调者(Transactional Coordinator)。生产者向事务协调者获取 PID。事务开始Consume-Process-Produce:消费者(流处理应用同时作为消费者和生产者)从源 Topic 消费消息并做处理。生产者同步消息所要发往的 Topic-Partition 信息给事务协调者。生产者向目标 Topic-Partition 发送消息。生产者同步提交位点所要发往的 Topic-Partition (内部 Topic __consumer_offsets)信息给事务协调者。生产者通知消费组协调者(Group Coordinator,服务端负责感知消费组变化的 Broker)提交位点(仅持久化位点,但并未更新缓存,因此直到事务提交前对消费者不可见)提交/回滚事务生产者告知事务协调者事务执行结果(提交/回滚)。事务协调者向事务所涉及的分区 Leader 发送控制消息标记事务执行结果,同时事务协调者给生产者响应事务已提交/回滚成功。待所有分区 Leader 将控制消息持久化(任何一个失败都会进行无限重试)后,事务协调者将该事务状态修改为已提交/回滚,事务结束。

这就是 Kafka 实现流计算针对场景实现 Exactly Once 的大体流程,而在我们正式深入每一步骤的实现前,可以先熟悉下 Kafka 的客户端及服务端是怎么对事务状态进行维护的,帮助后面更好地理解细节。

状态转移

首先是服务端的状态管理,总共分为以下几个状态:

EMPTY:事务尚未开始。ONGOING:事务已开始并正在进行中。PREPARE_COMMIT:事务准备提交。PREPARE_ABORT:事务准备回滚。COMPLETE_COMMIT:事务已经提交。COMPLETE_ABORT:事务已经回滚。DEAD:事务 ID 已过期且即将从事务内存中移除。PREPARE_EPOCH_FENCE:处于增加 epoch 数及隔离掉过期生产者的过程中。

而客户端的状态管理则分为以下几种状态:

UNINITIALIZED:事务生产者初始化前的状态,这个阶段没有任何事务的处理。INITIALIZING:事务初始化的过程,包含寻找协调者以及获取 PID 等。READY:对于全新的事务,生产者收到来自协调者事务初始化请求的响应后,或者对于已有的事务,事务流程完成后会将状态置为 READY。IN_TRANSACTION:事务生产者正式发送消息前,将自己的状态置为 IN_TRANSACTION,标识事务开始。COMMITTING_TRANSACTION:事务生产者告知服务端事务提交前,会先将自己更新至该状态。ABORTING_TRANSACTION:事务生产者告知服务端事务回滚前,会先将自己更新至该状态。ABORTABLE_ERROR:事务流程中,如果有数据发送失败/异常,会转换为该状态,再自动回滚事务。FATAL_ERROR:转移到该状态后,再进行状态转移时会抛出异常。

实现细节

接下来,让我们正式深入 Exactly-once 的实现细节,了解这其中的每一步客户端和服务端都做了哪些事。

1. 事务初始化1.a 生产者寻找事务协调者

如同前面所说,事务性机制实现的关键就在于事务协调者,所以整个流程的第一步就是生产者需要找到当前事务所对应的协调者。

生产者调用 initTransactions() 方法后,首先会向服务一台任意一台 Broker (一般选择本地连接最少的)发送 FindCoordinatorRequest,该请求会携带自定义的 Transactional ID (事务 ID)。

服务端则会将事务 ID 的哈希码与 __transaction_state 的分区数(默认为50)取模,得到对应分区 Leader(分区所属的 Broker)会作为协调者的角色。

def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount1.b 生产者向事务协调者获取 PID

找到协调者后,生产者便会向其发送 InitPidRequest 请求分配 PID 以及该 PID 所对应的 epoch。

而协调者则会通过 ProducerIdManager (与协调者一同初始化的对象,负责管理 PID 信息)向 ZooKeeper 申请一个 PID 段(每次申请1000个 PID),再将该 PID 段区间记录到 ZK 的 “/latest_producer_id” 节点中。当所需 PID 大于该节点所记录的区间时,再重新申请新的 PID 段,否则便将下一个可用的 PID 信息返回给协调者。

def generateProducerId(): Long = {   this synchronized {     // 如果分配的 PID 段大于申请的 PID 段区间,便向ZK再申请1000个     if (nextProducerId > currentProducerIdBlock.producerIdEnd) {       allocateNewProducerIdBlock()       nextProducerId = currentProducerIdBlock.producerIdStart     }     nextProducerId += 1 // 下一可用 PID     nextProducerId - 1 // 返回当前分配的 PID   } }

除了给生产者返回 PID + epoch 信息,协调者还会初始化该事务 ID 所对应的事务元数据(e.g. 包括 PID、epoch、Topic-Partition等)并持久化到 __transaction_state 中(此时事务状态置为 EMPTY)。

2. 事务开始

这一步生产者会调用 beginTransaction() 方法,表示事务操作正式开始。可以看到,这里并未有任何的客户端-服务端交互,客户端只是简单的将本地的状态转换为 IN_TRANSACTION。

public synchronized void beginTransaction () {   ensureTransactional();   maybeFailWithError();   // 将生产者的事务状态置换为 IN_TRANSACTION   transitionTo(State.IN_TRANSACTION); }3. Consume-Process-Produce3.a 消费者从源 Topic 消费消息并做处理

Consume-Process-Produce 流程的第一步为应用作为消费者的角色从源 Topic 轮询获取消息,并对获取到的消息做处理后进行发送。

for (ConsumerRecord record : records) {   // 对消费到的消息进行处理   ProducerRecord customizedRecord = transform(record);   producer.send(customizedRecord); }3.b 生产者同步消息所要发往的分区给事务协调者

生产者发送消息会分为两步,首先会把即将要发往的 Topic-Partition 信息先同步给协调者,再正式向 Topic-Partition 发送消息。

我们来看下代码实现,生产者调用 send() 方法时,会将对应 Topic-Partition 信息记录到本地的 TransactionManager,如果发现该 Topic-Partition 之前不存在,则向协调者发送 AddPartitionsToTxnRequest,添加该信息到事务 ID 对应的元数据中并持久化到 __transaction_state 日志。

// 正式发消息前会对事务消息类型做检查,判断是否需要同步 Topic-Partition 信息给协调者 if (transactionManager != null && transactionManager.isTransactional())   transactionManager.maybeAddPartitionToTransaction(tp); public synchronized void maybeAddPartitionToTransaction (TopicPartition topicPartition) {   // 如果分区信息已添加到 TransactionManager 中   if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))     return;   log.debug("Begin adding new partition {} to transaction", topicPartition);   // 添加事务消息的分区信息到待发送分区集合中(发往协调者)   topicPartitionBookkeeper.addPartition(topicPartition);   newPartitionsInTransaction.add(topicPartition); }3.c 生产者向目标 Topic-Partition 发送消息

生产者将携带 Batch 消息的 ProduceRequest 发往指定的 Topic-Partition,这里比较重要的逻辑是会给当前 Batch 消息所对应的 Topic-Partition 设置序列号和事务标识,该序列号会随发往该分区的消息而递增。

synchronized Integer sequenceNumber(TopicPartition topicPartition) {   return topicPartitionBookkeeper.getOrCreatePartition(topicPartition).nextSequence; }

服务端接收到请求后,会验证其携带的 Batch 消息是否重复以及乱序。

def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {   if (batch.producerEpoch != producerEpoch)     None   else     batchWithSequenceRange(batch.baseSequence, batch.lastSequence) } def batchWithSequenceRange(     firstSeq: Int,     lastSeq: Int ): Option[BatchMetadata] = {   // ProducerStateEntry#batchMetadata 属性存放了该生产者最新发送的5个消息批次的序号,如果该集合中存在某个消息批次与请求的消息批次的   // baseSequence(第一条消息的序号)、 lastSequence(最后一条消息的序号)相同,则认为生产者发送的消息批次重复发送,不做处理   val duplicate = batchMetadata.filter { metadata =>     firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq   }   duplicate.headOption } // 判断序列号是否连续 private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {   nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue) }3.d 生产者同步所要提交的消息位点分区信息给事务协调者

生产者会调用 sendOffsetsToTransaction() 方法进行消费位点的提交,该步骤可视为与发送事务消息一样,一样分为两步。

首先向协调者发送 AddOffsetsToTxnRequest 将即将要发往的 Topic-Partition(提交位点是向 __consumer_offsets 内部 Topic)同步给协调者,再将位点信息发送给消费组协调者(GroupCoordinator,__consumer_offsets 的分区 Leader,会负责记录消费进度)。

public synchronized TransactionalRequestResult sendOffsetsToTransaction(     final Map offsets,     final ConsumerGroupMetadata groupMetadata) {   ...   // 生产者发送 AddOffsetsToTxn 请求给事务协调者,请求中携带了ACK偏移量,事务协调者收到该   // 请求后会将偏移量信息存储到事务状态主题中,生产者收到事务协调者对 AddOffsetsToTxn    // 请求的响应后,再正式提交位点发给GroupCoordinator   log.debug("Begin adding offsets {} for consumer group {} to transaction",       offsets, groupMetadata);   AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(       new AddOffsetsToTxnRequestData()           .setTransactionalId(transactionalId)           .setProducerId(producerIdAndEpoch.producerId)           .setProducerEpoch(producerIdAndEpoch.epoch)           .setGroupId(groupMetadata.groupId()));   AddOffsetsToTxnHandler handler =       new AddOffsetsToTxnHandler(builder, offsets, groupMetadata);   enqueueRequest(handler);   return handler.result; }

协调者在收到该请求后,便跟 3.b 一样,将根据指定 group.id(消费者组 ID) 算出对应的 __consumer_offsets 分区信息(group.id 的哈希码与 __consumer_offsets 的分区数取模)写入到当前事务 ID 所对应的元数据并持久化到 __transaction_state 的日志中。

// 计算消费位点信息要发往哪个 __consumer_offsets 分区 def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount // 将该分区信息添加到事务元数据中(与处理存储分区信息共用 handleAddPartitionsToTransaction 方法), // 并将新的事务元数据写入 __transaction_state 事务状态主题 txnCoordinator.handleAddPartitionsToTransaction(   transactionalId,   addOffsetsToTxnRequest.data.producerId,   addOffsetsToTxnRequest.data.producerEpoch,   Set(offsetTopicPartition),   sendResponseCallback,   requestLocal )3.e 生产者通知消费组协调者提交位点

收到协调者对 AddOffsetsToTxnRequest 的响应后,生产者才会正式向消费组协调者(GroupCoordinator)发送 TxnOffsetCommitRequest 请求正式提交已消费的消息位点信息。

消费组协调者(GroupCoordinator)接收到 TxnOffsetCommitRequest 请求后,会根据 group.id 计算出对应的 __consumer_offsets 分区,并将位点信息(包含 PID 信息)持久化。

需要注意的一点是,此时消费组协调者还不会更新消费进度缓存,所以对消费者(消费时通过接口获取缓存中的消费进度)尚不可见,只有这个事务完成提交后才会将位点信息写入到缓存。

// 生产者收到来自协调者对 AddOffsetsToTxnRequest 请求的响应后, // 即发送TxnOffsetCommit请求给Group Coordinator,请求中携带了已消费位点 pendingRequests.add(txnOffsetCommitHandler(result, offsets, groupMetadata)); // GroupCoordinator 将位点信息持久化至 __consumer_offsets 日志中 private def doTxnCommitOffsets(group: GroupMetadata,                                  memberId: String,                                  groupInstanceId: Option[String],                                  generationId: Int,                                  producerId: Long,                                  producerEpoch: Short,                                  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],                                  requestLocal: RequestLocal,                                  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock {    val validationErrorOpt = validateOffsetCommit(group, generationId, memberId, groupInstanceId,                                                   isTransactional = true)    if (validationErrorOpt.isDefined) {      responseCallback(offsetMetadata.map { case (k, _) => k -> validationErrorOpt.get })     } else {       // 将位点信息进行持久化      groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId,           producerEpoch, requestLocal)     }   } }4. 提交/回滚事务4.a 生产者告知事务协调者事务执行结果

事务操作完成后,生产者会调用 commitTransaction()/abortTransaction() 来提交或回退当前事务,这俩操作都会发送携带提交/回滚标识的 EndTxnRequest 给协调者。

// 切换状态至正在提交事务COMMITTING_TRANSACTION,并发送EndTxnRequest请求给协调者(beginCompletingTransaction), // 事务回滚也是相同流程 public synchronized TransactionalRequestResult beginCommit() {     return handleCachedTransactionRequestResult(() -> {         maybeFailWithError();         transitionTo(State.COMMITTING_TRANSACTION);         return beginCompletingTransaction(TransactionResult.COMMIT);     }, State.COMMITTING_TRANSACTION); } private TransactionalRequestResult beginCompletingTransaction(     TransactionResult transactionResult) {   ...   // 发送EndTxnRequest请求,请求中携带了提交事务或回退事务的标志,这里只关注提交事务的逻辑,协调者收到EndTxnRequest请求后,   // 会将事务状态切换到PREPARE_COMMIT状态,并将该PREPARE_COMMIT状态写入事务状态主题,写入成功后,协调者返回事务提交成功的   // 响应给生产者   if (!(lastError instanceof InvalidPidMappingException)) {     EndTxnRequest.Builder builder = new EndTxnRequest.Builder(         new EndTxnRequestData()             .setTransactionalId(transactionalId)             .setProducerId(producerIdAndEpoch.producerId)             .setProducerEpoch(producerIdAndEpoch.epoch)             .setCommitted(transactionResult.id));     EndTxnHandler handler = new EndTxnHandler(builder);     enqueueRequest(handler);     if (!epochBumpRequired) {       return handler.result;     }   }   // 将状态转换回 INITIALIZING 状态,请求协调者增加 PID 所对应的 epoch,并回到 READY 状态,   // 准备进行下次事务操作   return initializeTransactions(this.producerIdAndEpoch); }

协调者收到 EndTxnRequest 请求后,会更新 __transaction_state 的元数据,将事务状态置为 PREPARE_COMMIT/PREPARE_ABORT。

4.b 事务协调者向事务所涉及的分区 Leader 发送控制消息

待 __transaction_state 元数据更新并持久化成功后,接下来协调者会同时做两件事,分别为向生产者发送执行成功响应(实际还没有),后续事务执行交给协调者来完成,以及向各个事务所涉及的 Topic-Partition 发送 WriteTxnMarkerRequest 请求,标记事务执行结果。

preSendResult match {   case Left(err) =>     info(       s"Aborting sending of transaction markers after appended $txnMarkerResult to transaction log and returning $err error to client for $transactionalId's EndTransaction request"     )     responseCallback(err)   case Right((txnMetadata, newPreSendMetadata)) =>     // 先返回给客户端成功响应,然后再发送marker消息直到broker成功持久化控制消息     responseCallback(Errors.NONE)     // 该方法会为该事务所有事务分区创建一个TxnIdAndMarkerEntry实例(代表一个待发送的 WriteTxnMarkers 请求,目标节点为分区     // Leader),并添加到TransactionMarkerChannelManager#markersQueuePerBroker中     txnMarkerChannelManager.addTxnMarkersToSend(       coordinatorEpoch,       txnMarkerResult,       txnMetadata,       newPreSendMetadata     ) }

判断控制消息(WriteTxnMarkerRequest)是否成功写入到所有目标 Topic-Partition,如果不成功则需不断进行重试,直到所有控制消息都已持久化。

if (!abortSending) {   if (retryPartitions.nonEmpty) {     debug(       s"Re-enqueuing ${txnMarker.transactionResult} transaction markers for transactional id $transactionalId " +         s"under coordinator epoch ${txnMarker.coordinatorEpoch}"     )     // 如果Broker返回处理失败的响应,则协调者重新生成TxnIdAndMarkerEntry实例并添加到TransactionMarkerChannelManager#markersQueuePerBroker队列中,     // 后续协调者会重新发送WriteTxnMarkers请求到该Broker,直到事务中所有的Broker处理成功,该事务才算提交完成     txnMarkerChannelManager.addTxnMarkersToBrokerQueue(       transactionalId,       txnMarker.producerId,       txnMarker.producerEpoch,       txnMarker.transactionResult,       txnMarker.coordinatorEpoch,       retryPartitions.toSet     )   } else {     // 如果Broker返回处理成功的响应     txnMarkerChannelManager.maybeWriteTxnCompletion(transactionalId)   } } 4.c 事务协调者将该事务状态修改为已提交/回滚

待所有控制消息都被持久化到对应的日志文件后,协调者才会将 __transaction_state 中,该事务的状态更新为 COMPLETE_COMMIT/COMPLETE_ABORT,表示该事务已完成,可以将相关的缓存内容清除了,这里只需要保留已完成事务的 PID 及事务完成的时间戳,即可通过事务过期处理机制删除过期的事务 ID & PID 映射。

这边需要特别提下事务回滚操作,其流程跟事务提交大同小异,但回滚除了由客户端主动触发,也有可能由协调者本身来进行触发,比如事务超时的场景,协调者会定时扫描检查是否有超时的事务。

scheduler.schedule(   "transaction-abort",   () => abortTimedOutTransactions(onEndTransactionComplete),   txnConfig.abortTimedOutTransactionsIntervalMs,   txnConfig.abortTimedOutTransactionsIntervalMs )

此外,回滚操作还会向 .txnIndex 文件更新左右已回滚的事务,方便消费端更高效地过滤掉已回滚的数据,不需等待消费到控制消息。

// 检查已完成的事务中是否有需要记录到 .txnindex 文件中的回滚事务 completedTxns.foreach { completedTxn =>   val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)   segment.updateTxnIndex(completedTxn, lastStableOffset)   producerStateManager.completeTxn(completedTxn) } // 如果事务以回滚,便会将事务的起止 offset,以及当前的 LSO 记录到 .txnindex 文件 def updateTxnIndex(completedTxn: CompletedTxn, lastStableOffset: Long): Unit = {   if (completedTxn.isAborted) {     trace(       s"Writing aborted transaction $completedTxn to transaction index, last stable offset is $lastStableOffset"     )     txnIndex.append(new AbortedTxn(completedTxn, lastStableOffset))   } }Kafka 如何处理事务流程中的故障/异常?

Kafka 针对流计算场景实现的 Exactly Once 能力看似很完备,但我们都知道在分布式的世界中,事情往往不会那么美好,所以我们有必要来分析下它是如何应对各个事务执行阶段可能发生的异常或故障的。

生产者故障/重启的 Exactly Once 保障情况?

在事务流程中,生产者主要涉及如下几个阶段:

事务初始化(Empty)生产者发送请求(FindCoordinatorRequest/InitProducerIdRequest)错误/超时:生产者会根据返回的异常信息进行重试/关闭。生产者故障:如果已成功初始化事务元数据,则会通过服务端的事务 ID 超时机制,清除过期事务 ID。生产者重启:重启后的生产者会重新向服务端发送 FindCoordinatorRequest 寻找协调者,并向协调者请求获取 PID,请求会携带与重启前一样的 Transactional ID,如果协调者处在重启前已生成并记录该 Transactional ID 对应的事务状态,则服务端会返回其相对应的 PID + epoch 信息(epoch 会加1),否则重新生成事务元数据并返回 PID + epoch 信息。事务进行中(Empty -> Ongoing)生产者发送请求(AddPartitionToTxnRequest/ProduceRequest/AddOffsetsToTxnRequest/TxnOffsetsCommitRequest)错误/超时:生产者会根据返回的异常信息,主动向协调者发送回滚请求回滚该事务。生产者故障:通过服务端的事务超时机制回滚该事务。生产者重启:重启后会从头开始事务流程(包括事务初始化 + Consume-Process-Produce流程),在事务初始化阶段,服务端会回滚该事务 ID 重启前的事务,该流程会要求生产者等待一段时间后重试,回滚过期事务后才会继续新事务。事务提交/回滚(Ongoing -> PrepareCommit/PrepareAbort -> CompleteCommit/CompleteAbort)生产者发送请求(EndTxnRequest)错误/超时:生产者会根据返回的异常信息,主动回滚事务,或者重试 EndTxnRequest 请求到协调者,而协调者收到该请求后则会发送控制消息到各分区 Leader 来标识事务已完成。生产者故障:服务端如果有成功收到生产者的 EndTxnRequest,则会接管后续提交/回滚流程,向各分区 Leader 发送控制消息。如果服务端没有收到生产者的 EndTxnRequest,则会通过事务超时机制回滚该事务。生产者重启:重启后会从头开始事务流程(包括事务初始化 + Consume-Process-Produce流程),在事务初始化阶段,服务端会回滚该事务 ID 重启前的事务,该流程会要求生产者等待一段时间后重试,回滚过期事务后才会继续新事务。Broker 故障/重启的 Exactly Once 保障情况?

在事务流程中,Broker 主要涉及如下几个阶段:

事务初始化(Empty)协调者发生故障/重启:通过 HA 机制重新选主客户端重试 FindCoordinatorRequest 找到新协调者,并继续后续事务流程。事务进行中(Empty -> Ongoing)协调者发生故障/重启:通过 HA 机制重新选主 + Transaction Log 恢复事务状态(ONGOING),客户端重试 FindCoordinatorRequest 寻找新任新协调者,并继续发送消息。Broker 发生故障/重启:通过 HA 机制重新选主 + PID Snapshot & 日志恢复 PID-消息序列号映射,客户端向对应的分区 Leader 重新发送消息。事务提交/回滚(Ongoing -> PrepareCommit/PrepareAbort -> CompleteCommit/CompleteAbort)协调者发生故障/重启:通过 HA 机制重新选主 + Transaction Log 恢复事务状态(PREPARE_COMMIT/PREPARE_ABORT),新任协调者会继续完成后续事务流程(向各分区 Leader 发送控制消息)。Broker 发生故障/重启:通过 HA 机制重新选主 + PID Snapshot & 日志恢复 PID-消息序列号映射,协调者会不断重试发送控制消息直到其成功写入对应的分区 Leader。事务超时

客户端在事务初始化阶段会设置一个事务超时时间(transaction.timeout.ms,默认60秒),该参数用于限制服务端等待客户端更新事务状态的最大时间,超过该时间则主动回滚当前事务。

对应的服务端则有个最长可允许事务超时时间设置(同样叫 max.transaction.timeout.ms,默认15分钟),客户端的超时时间不可超过服务端该值,否则抛出 InvalidTxnTimeoutException。

除了事务操作超时设置,我们都知道协调者会对事务 ID 的相关元数据进行缓存,为了保证缓存不会被无效占用,所以还有个事务 ID 超时配置 transaction.id.expiration.ms,如果事务 ID 长期(默认7天)无请求发送过来,则会判断该事务 ID 已过期,将该映射进行清除。

参考材料KIP-98 - Exactly Once Delivery and Transactional MessagingExactly Once Delivery and Transactional Messaging in KafkaIdempotent ProducerEnabling Exactly-Once in Kafka StreamsTransactions in Apache KafkaExactly-Once Semantics Are Possible: Here’s How Kafka Does ItKafka事务原理剖析Kafka Exactly-Once 之事务性实现Kafka设计解析(八)- Exactly Once语义与事务机制原理Kafka 事务性之幂等性实现


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3