【数据结构篇】请求队列InFlightRequest

您所在的位置:网站首页 inflight翻译 【数据结构篇】请求队列InFlightRequest

【数据结构篇】请求队列InFlightRequest

2023-09-18 15:40| 来源: 网络整理| 查看: 265

1.请求队列简单介绍:

InFlightRequest是client的请求队列。max.in.flight.requests.per.connection配置请求队列大小,默认5,请求队列中存放的是在发送途中的请求,包括:正在发送的请求和已经发送的但还没有接收到response的请求;请求队列满了,发送消息将会发生阻塞。也就是发往同一个node的最大未响应请求数。

具体实现是:

sender线程在发送消息之前会调用NetworkClient的ready检查连接是否准备好了,这个时候会检查请求队列是否可用,详细查看

【网络核心层篇】NetworkClient—检查连接 如果请求队列已经满了,那么会认为连接不可用,将不会发送消息。就会发生阻塞。

如果可用则调用NetworkClient的send方法,这个时候会将请求入队。

在NetworkClient的poll中handleCompletedSends(responses, updatedNow);处理上次完成的请求,这个时候会从请求队列中移除已经收到响应的请求。

图解: 在这里插入图片描述

2.主要的成员 /** * The set of requests which have been sent or are being sent but haven't yet received a responseq * 请求队列 * 1. 正在发送的请求 * 2. 已经发送的但还没有接收到response的请求; */ final class InFlightRequests { /** * 每个连接最大执行中请求数-->max.in.flight.requests.per.connection */ private final int maxInFlightRequestsPerConnection; /** * node->Deque * 双端队列Deque,其中的元素请求ClientRequest * 新请求从队列头部插入。发送从队列尾部取出 */ private final Map requests = new HashMap(); /** * Thread safe total number of in flight requests. */ private final AtomicInteger inFlightRequestCount = new AtomicInteger(0); 3.核心API: 3.1请求入队 /** * 请求入队 */ public void add(NetworkClient.InFlightRequest request) { // 目的地 String destination = request.destination; // 获取目的地请求队列,没有则新建一个ArrayDeque Deque reqs = this.requests.get(destination); if (reqs == null) { reqs = new ArrayDeque(); this.requests.put(destination, reqs); } // 向头部添加请求 reqs.addFirst(request); // 请求消息个数+1, inFlightRequestCount.incrementAndGet(); } 3.2请求出队 /** * 请求出队 */ public NetworkClient.InFlightRequest completeNext(String node) { // 从指定的请求队列的尾部取出一个请求 NetworkClient.InFlightRequest inFlightRequest = requestQueue(node).pollLast(); // 请求数量-1 inFlightRequestCount.decrementAndGet(); return inFlightRequest; } /** * 获取指定noded的请求队列 */ private Deque requestQueue(String node) { Deque reqs = requests.get(node); if (reqs == null || reqs.isEmpty()) throw new IllegalStateException("There are no in-flight requests for node " + node); return reqs; } 3.3请求队列校验 /** * Can we send more requests to this node? * * 校验请求队列是否满了 * * @param node Node in question * @return true iff we have no requests still being sent to the given node * * 重点在于queue.peekFirst().request().completed, 即如果发给这个节点的最早的请求还没有发送完成,是不能再往这个节点发送请求的。 * * 从canSendMore方法中也可以看出, 只要没有超过maxInFlightRequestsPerConnection,一个node可以有多个in-flight request的 * * maxInFlightRequestsPerConnection max.in.flight.requests.per.connection */ public boolean canSendMore(String node) { Deque queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size()


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3