kafka消费者

您所在的位置:网站首页 kafka消费者poll kafka消费者

kafka消费者

2023-08-17 19:26| 来源: 网络整理| 查看: 265

1、KafkaConsumer poll 详解

消息拉起主要入口为:KafkaConsumer#poll方法,其声明如下:

public ConsumerRecords poll(final Duration timeout) { // @1 return poll(time.timer(timeout), true); // @2 }

代码@1:参数为超时时间,使用 java 的 Duration 来定义。 代码@2:调用内部的 poll 方法。

KafkaConsumer#poll

private ConsumerRecords poll(final Timer timer, final boolean includeMetadataInTimeout) { // @1 acquireAndEnsureOpen(); // @2 try { if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { // @3 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires do {// @4 client.maybeTriggerWakeup(); //@5 if (includeMetadataInTimeout) { // @6 if (!updateAssignmentMetadataIfNeeded(timer)) { return ConsumerRecords.empty(); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } } final Map records = pollForFetches(timer); // @7 if (!records.isEmpty()) { if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { // @8 client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords(records)); // @9 } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); } }

代码@1:首先先对其参数含义进行讲解。

boolean includeMetadataInTimeout 拉取消息的超时时间是否包含更新元数据的时间,默认为true,即包含。

代码@2:检查是否可以拉取消息,其主要判断依据如下:

KafkaConsumer 是否有其他线程再执行,如果有,则抛出异常,因为 - KafkaConsumer 是线程不安全的,同一时间只能一个线程执行。KafkaConsumer 没有被关闭。

代码@3:如果当前消费者未订阅任何主题或者没有指定队列,则抛出错误,结束本次消息拉取。

代码@4:使用 do while 结构循环拉取消息,直到超时或拉取到消息。

代码@5:避免在禁止禁用wakeup时,有请求想唤醒时则抛出异常,例如在下面的@8时,会禁用wakeup。

代码@6:更新相关元数据,为真正向 broker 发送消息拉取请求做好准备,该方法将在下面详细介绍,现在先简单介绍其核心实现点:

如有必要,先向 broker 端拉取最新的订阅信息(包含消费组内的在线的消费客户端)。执行已完成(异步提交)的 offset 提交请求的回调函数。维护与 broker 端的心跳请求,确保不会被“踢出”消费组。更新元信息。如果是自动提交消费偏移量,则自动提交偏移量。更新各个分区下次待拉取的偏移量。

这里会有一个更新元数据是否占用消息拉取的超时时间,默认为 true。

代码@7:调用 pollForFetches 向broker拉取消息,该方法将在下文详细介绍。

代码@8:如果拉取到的消息集合不为空,再返回该批消息之前,如果还有挤压的拉取请求,可以继续发送拉取请求,但此时会禁用warkup,主要的目的是用户在处理消息时,KafkaConsumer 还可以继续向broker 拉取消息。

代码@9:执行消费拦截器。

接下来对上文提到的代码@6、@7进行详细介绍。

1.1 KafkaConsumer updateAssignmentMetadataIfNeeded 详解

KafkaConsumer#updateAssignmentMetadataIfNeeded

boolean updateAssignmentMetadataIfNeeded(final Timer timer) { if (coordinator != null && !coordinator.poll(timer)) { // @1 return false; } return updateFetchPositions(timer); // @2 }

要理解这个方法实现的用途,我们就必须依次对 coordinator.poll 方法与 updateFetchPositions 方法。

1.1.1 ConsumerCoordinator#poll

public boolean poll(Timer timer) { invokeCompletedOffsetCommitCallbacks(); // @1 if (subscriptions.partitionsAutoAssigned()) { // @2 pollHeartbeat(timer.currentTimeMs()); // @21 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { //@22 return false; } if (rejoinNeededOrPending()) { // @23 if (subscriptions.hasPatternSubscription()) { // @231 if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) { this.metadata.requestUpdate(); } if (!client.ensureFreshMetadata(timer)) { return false; } } if (!ensureActiveGroup(timer)) { // @232 return false; } } } else { // @3 if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); } } maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); // @4 return true; }

代码@1:执行已完成的 offset (消费进度)提交请求的回调函数。

代码@2:队列负载算法为自动分配(即 Kafka 根据消费者个数与分区书动态负载分区)的相关的处理逻辑。其实现关键点如下:

代码@21:更新发送心跳相关的时间,例如heartbeatTimer、sessionTimer、pollTimer 分别代表发送最新发送心跳的时间、会话最新活跃时间、最新拉取消息。代码@22:如果不存在协调器或协调器已断开连接,则返回 false,结束本次拉取。如果协调器就绪,则继续往下走。代码@23:判断是否需要触发重平衡,即消费组内的所有消费者重新分配topic中的分区信息,例如元数据发送变化,判断是否需要重新重平衡的关键点如下: 如果队列负载是通过用户指定的,则返回 false,表示无需重平衡。如果队列是自动负载,topic 队列元数据发生了变化,则需要重平衡。如果队列是自动负载,订阅关系发生了变化,则需要重平衡。 如果需要重重平衡,则同步更新元数据,此过程会阻塞。详细的重平衡将单独重点介绍,这里暂时不深入展开。

代码@3:用户手动为消费组指定负载的队列的相关处理逻辑,其实现关键如下:

如果需要更新元数据,并且还没有分区准备好,则同步阻塞等待元数据更新完毕。

代码@4:如果开启了自动提交消费进度,并且已到下一次提交时间,则提交。Kafka 消费者可以通过设置属性 enable.auto.commit 来开启自动提交,该参数默认为 true,则默认会每隔 5s 提交一次消费进度,提交间隔可以通过参数 auto.commit.interval.ms 设置。

接下来继续探讨 updateAssignmentMetadataIfNeeded (更新元数据)的第二个步骤,更新拉取位移。

1.1.2 updateFetchPositions 详解

KafkaConsumer#updateFetchPositions

private boolean updateFetchPositions(final Timer timer) { cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions(); if (cachedSubscriptionHashAllFetchPositions) { // @1 return true; } if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) // @2 return false; subscriptions.resetMissingPositions(); // @3 fetcher.resetOffsetsIfNeeded(); // @4 return true; }

代码@1:如果订阅关系中的所有分区都有有效的位移,则返回 true。

代码@2:如果存在任意一个分区没有有效的位移信息,则需要向 broker 发送请求,从broker 获取该消费组,该分区的消费进度。相关的实现细节将在后续文章【Kafka 消费进度】专题文章中详细介绍。

代码@3:如果经过第二步,订阅关系中还某些分区还是没有获取到有效的偏移量,则使用偏移量重置策略进行重置,如果未配置,则抛出异常。

代码@4:发送一个异步请求去重置那些正等待重置位置的分区。有关 Kafka 消费消费进度、重平衡等知识将会在后续文章中深入探讨,本文只需了解 poll 消息的核心处理流程。

从 KafkaConsumer#poll 中流程可以看到,通过 updateAssignmentMetadataIfNeeded 对元数据、重平衡,更新拉取偏移量等工作处理完成后,下一步就是需要向 broker 拉取消息了,其实现入口为:KafkaConsumer 的 pollForFetches 方法。

1.2 消息拉取

KafkaConsumer#pollForFetches

private Map pollForFetches(Timer timer) { long pollTimeout = coordinator == null ? timer.remainingMs() : Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // @1 // if data is available already, return it immediately final Map records = fetcher.fetchedRecords(); // @2 if (!records.isEmpty()) { return records; } fetcher.sendFetches(); // @3 // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { // @4 pollTimeout = retryBackoffMs; } Timer pollTimer = time.timer(pollTimeout); client.poll(pollTimer, () -> { return !fetcher.hasCompletedFetches(); }); // @5 timer.update(pollTimer.currentTimeMs()); // @6 if (coordinator != null && coordinator.rejoinNeededOrPending()) { // @7 return Collections.emptyMap(); } return fetcher.fetchedRecords(); // @8 }

代码@1:计算本次拉取的超时时间,其计算逻辑如下:

如果协调器为空,则返回当前定时器剩余时间即可。如果协调器不为空,其逻辑较为复杂,为下面返回的超时间与当前定时器剩余时间相比取最小值。如果不开启自动提交位移并且未加入消费组,则超时时间为Long.MAX_VALUE。如果不开启自动提交位移并且已加入消费组,则返回距离下一次发送心跳包还剩多少时间。如果开启自动提交位移,则返回 距离下一次自动提交位移所需时间 与 距离下一次发送心跳包所需时间 之间的最小值。

代码@2:如果数据已经拉回到本地,直接返回数据。将在下文详细介绍 Fetcher 的 fetchedRecords 方法。

代码@3:组装发送请求,并将存储在待发送请求列表中。

代码@4:如果已缓存的分区信息中存在某些分区缺少偏移量,如果拉取的超时时间大于失败重试需要阻塞的时间,则更新此次拉取的超时时间为失败重试需要的间隔时间,主要的目的是不希望在 poll 过程中被阻塞【后续会详细介绍 Kafka 拉取消息的线程模型,再来回顾一下这里】。

代码@5:通过调用NetworkClient 的 poll 方法发起消息拉取操作(触发网络读写)。

代码@6:更新本次拉取的时间。

代码@7:检查是需要重平衡。

代码@8:将从 broker 读取到的数据返回(即封装成消息)。

从上面消息拉取流程来看,有几个比较重要的方法,例如 Fetcher 类相关的方法,NetworkClient 的 poll 方法,那我们接下来来重点探讨。

我们先用一张流程图总结一下消息拉取的全过程:

在这里插入图片描述

 

接下来我们将重点看一下 KafkaConsumer 的 pollForFetches 详细过程,也就是需要详细探究 Fetcher 类的实现细节。

 

2、Fetcher 类详解

Fetcher 封装消息拉取的方法,可以看成是消息拉取的门面类。

2.1 类图

 

在这里插入图片描述

 

我们首先一一介绍一下 Fetcher 的核心属性与核心方法。

 

ConsumerNetworkClient client 消费端网络客户端,Kafka 负责网络通讯实现类。int minBytes 一次消息拉取需要拉取的最小字节数,如果不组,会阻塞,默认值为1字节,如果增大这个值会增大吞吐,但会增加延迟,可以通参数 fetch.min.bytes 改变其默认值。int maxBytes 一次消息拉取允许拉取的最大字节数,但这不是绝对的,如果一个分区的第一批记录超过了该值,也会返回。默认为50M,可通过参数 fetch.max.bytes 改变其默认值。同时不能超过 broker的配置参数(message.max.bytes) 和 主题级别的配置(max.message.bytes)。int maxWaitMs 在 broker 如果符合拉取条件的数据小于 minBytes 时阻塞的时间,默认为 500ms ,可通属性 fetch.max.wait.ms 进行定制。int fetchSize 每一个分区返回的最大消息字节数,如果分区中的第一批消息大于 fetchSize 也会返回。long retryBackoffMs 失败重试后需要阻塞的时间,默认为 100 ms,可通过参数 retry.backoff.ms 定制。long requestTimeoutMs 客户端向 broker 发送请求最大的超时时间,默认为 30s,可以通过 request.timeout.ms 参数定制。int maxPollRecords 单次拉取返回的最大记录数,默认值 500,可通过参数 max.poll.records 进行定制。boolean checkCrcs 是否检查消息的 crcs 校验和,默认为 true,可通过参数 check.crcs 进行定制。Metadata metadata 元数据。FetchManagerMetrics sensors 消息拉取的统计服务类。SubscriptionState subscriptions 订阅信息状态。ConcurrentLinkedQueue< CompletedFetch> completedFetches 已完成的 Fetch 的请求结果,待消费端从中取出数据。Deserializer< K> keyDeserializer key 的反序列化器。Deserializer< V> valueDeserializer value 的饭序列化器。IsolationLevel isolationLevel Kafka的隔离级别(与事务消息相关),后续在研究其事务相关时再进行探讨。Map sessionHandlers 拉取会话监听器。

接下来我们将按照消息流程,一起来看一下 Fetcher 的核心方法。

2.2 Fetcher 核心方法

2.2.1 Fetcher#fetchedRecords

Fetcher#fetchedRecords

public Map fetchedRecords() { Map fetched = new HashMap(); // @1 int recordsRemaining = maxPollRecords; try { while (recordsRemaining > 0) { // @2 if (nextInLineRecords == null || nextInLineRecords.isFetched) { // @3 CompletedFetch completedFetch = completedFetches.peek(); if (completedFetch == null) break; try { nextInLineRecords = parseCompletedFetch(completedFetch); } catch (Exception e) { FetchResponse.PartitionData partition = completedFetch.partitionData; if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) { completedFetches.poll(); } throw e; } completedFetches.poll(); } else { // @4 List records = fetchRecords(nextInLineRecords, recordsRemaining); TopicPartition partition = nextInLineRecords.partition; if (!records.isEmpty()) { List currentRecords = fetched.get(partition); if (currentRecords == null) { fetched.put(partition, records); } else { List newRecords = new ArrayList(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); fetched.put(partition, newRecords); } recordsRemaining -= records.size(); } } } } catch (KafkaException e) { if (fetched.isEmpty()) throw e; } return fetched; }

代码@1:首先先解释两个局部变量的含义:

Map fetched 按分区存放已拉取的消息,返回给客户端进行处理。recordsRemaining:剩余可拉取的消息条数。

代码@2:循环去取已经完成了 Fetch 请求的消息,该 while 循环有两个跳出条件:

如果拉取的消息已经达到一次拉取的最大消息条数,则跳出循环。缓存中所有拉取结果已处理。

代码@3、@4 主要完成从缓存中解析数据的两个步骤,初次运行的时候,会进入分支@3,然后从 调用 parseCompletedFetch 解析成 PartitionRecords 对象,然后代码@4的职责就是从解析 PartitionRecords ,将消息封装成 ConsumerRecord,返回给消费端线程处理。

代码@3的实现要点如下:

首先从 completedFetches (Fetch请求的返回结果) 列表中获取一个 Fetcher 请求,主要使用的 Queue 的 peek()方法,并不会从该队列中移除该元素。然后调用 parseCompletedFetch 对处理结果进行解析返回 PartitionRecords。处理成功后,调用 Queue 的方法将已处理过的 Fetcher结果移除。

从上面可知,上述方法的核心方法是:parseCompletedFetch。

代码@4的实现要点无非就是调用 fetchRecords 方法,按分区组装成 Map,供消费者处理,例如供业务处理。

接下来将重点探讨上述两个方法的实现细节。

2.2.1.1 Fetcher#parseCompletedFetch

在尝试探讨该方法之前,我们首先对其入参进行一个梳理,特别是先认识其主要数据结构。

1、CompletedFetch 相关类图

在这里插入图片描述

 

从上图可以看出,CompleteFetch 核心属性主要如下:

 

TopicPartition partition 分区信息,返回结果都是以分区为纬度。long fetchedOffset 本次拉取的开始偏移量。FetchResponse.PartitionData partitionData 返回的分区数据。FetchResponseMetricAgregator metricAggregator 统计指标相关。short responseVersion broker 端的版本号。

分区的数据是使用 PartitionData 来进行封装的。我们也来简单的了解一下其内部数据结果。

Errors error 分区拉取的相应结果,Errors.NONE 表示请求成功。long highWatermark broker 端关于该分区的高水位线,即小于该偏移量的消息对于消费端是可见的。long lastStableOffset 分区中小于该偏移量的消息的事务状态已得到确认,要么是已提交,要么是已回滚,与事务相关,后面会专门探讨。List< AbortedTransaction> abortedTransactions 已拒绝的事物。T records 分区数据,是 BaseRecords 的子类。

2、parseCompletedFetch 详解

private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) { TopicPartition tp = completedFetch.partition; FetchResponse.PartitionData partition = completedFetch.partitionData; long fetchOffset = completedFetch.fetchedOffset; PartitionRecords partitionRecords = null; Errors error = partition.error; try { if (!subscriptions.isFetchable(tp)) { // @1 log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); } else if (error == Errors.NONE) { // @2 Long position = subscriptions.position(tp); if (position == null || position != fetchOffset) { // @21 log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + "the expected offset {}", tp, fetchOffset, position); return null; } log.trace("Preparing to read {} bytes of data for partition {} with offset {}", partition.records.sizeInBytes(), tp, position); Iterator


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3