Netty对HTTP2流控的支持

您所在的位置:网站首页 http2流量控制 Netty对HTTP2流控的支持

Netty对HTTP2流控的支持

2024-07-01 21:55| 来源: 网络整理| 查看: 265

前言

流量控制是HTTP/2的一项重要功能,它允许发送方根据接收方的处理能力来控制数据的传输速率。通过合理的流控机制,可以确保服务器和客户端之间的通信不会出现拥塞或资源浪费。

HTTP/2中的流控通过两个机制实现:

流量控制窗口(Flow Control Window):每个HTTP/2连接都会有一个流量控制窗口,用于控制接收方可以接受的数据量。发送方在发送数据之前必须检查接收方的流量控制窗口大小,并确保发送的数据不会超过接收方的处理能力。接收方可以通过发送WINDOW_UPDATEFrame来调整窗口的大小。流量控制体系(Stream Prioritization and Flow Control):HTTP/2中的多个流(Stream)可以同时在一个连接上进行传输。每个流都有自己的流量控制窗口,用于控制该流的数据传输速率。发送方必须根据每个流的窗口大小来确定哪些流应该优先传输数据。通过优化流的优先级和流量控制窗口的调整,可以实现更有效的数据传输。

其中,流量控制窗口的实现较为简单,流优先级的控制则需要一套复杂的算法来支撑,好在Netty已经帮我们实现了。

Http2FlowController

io.netty.handler.codec.http2.Http2FlowController是Netty抽象出来的流控接口,下面有两个分支Http2LocalFlowController和Http2RemoteFlowController,分别针对inbound和outbound数据处理。 image.png HTTP2的流控采用滑动窗口设计,Http2FlowController主要是定义了对Stream窗口的读写。

public interface Http2FlowController { // 设置上下文 void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception; // 初始窗口大小 void initialWindowSize(int newWindowSize) throws Http2Exception; // 获取初试窗口 int initialWindowSize(); // 获取流的窗口大小 int windowSize(Http2Stream stream); // 调整流的窗口大小 void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception; }

Http2LocalFlowController是针对入站数据的流控,主要职责有:

收到被流控的Frame时,递减窗口,窗口小于下限时抛异常消费被流控的Frame后,在适当时机给对方发送WINDOW_UPDATEFrame,调整窗口大小 public interface Http2LocalFlowController extends Http2FlowController { // 设置FrameWrite,用于发送WINDOW_UPDATE Frame Http2LocalFlowController frameWriter(Http2FrameWriter frameWriter); // 接收流控Frame 递减窗口大小 void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception; // 已消费数据 适当实际发送WINDOW_UPDATE Frame boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception; // 获取流还未消费的数据大小 int unconsumedBytes(Http2Stream stream); // 获取流的初始窗口 int initialWindowSize(Http2Stream stream); }

Http2RemoteFlowController是针对出站数据的流控,主要职责有:

发送被流控的Frame前,确保对方有可用的窗口大小对方窗口大小为0时,Frame排队等待Stream可写状态发生变更时,触发监听器维护Stream的优先级和依赖关系 public interface Http2RemoteFlowController extends Http2FlowController { // 获取Channel上下文 ChannelHandlerContext channelHandlerContext(); // 对流控Frame排队 void addFlowControlled(Http2Stream stream, FlowControlled payload); // 流是否有排队的Frame boolean hasFlowControlled(Http2Stream stream); // 写入挂起字节 即排队的Frame void writePendingBytes() throws Http2Exception; // 设置监听器 void listener(Listener listener); // 流是否可写 即窗口>挂起字节 boolean isWritable(Http2Stream stream); // 可写状态变更 void channelWritabilityChanged() throws Http2Exception; // 更新流的依赖关系树 void updateDependencyTree(int childStreamId, int parentStreamId, short weight, boolean exclusive); } 初始窗口

HTTP2连接建立后,发送的第一个Frame一定是SETTINGSFrame,用来对连接进行配置。其中一个属性SETTINGS_INITIAL_WINDOW_SIZE用来设置Stream的初始窗口大小,如果没指定则使用默认值65535个字节。

io.netty.handler.codec.http2.Http2FrameCodec是Netty提供的HTTP2 Frame编解码器,构建该组件时可以配置默认Settings,如下示例将初始窗口大小设为10。

Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forServer() .initialSettings(Http2Settings.defaultSettings().initialWindowSize(10)) .build();

当有新连接建立时,会触发ChannelInboundHandler#channelActive()事件,Http2FrameCodec此时会基于配置的Settings来发送第一个SETTINGSFrame,方法是Http2ConnectionHandler.PrefaceDecoder#sendPreface():

private void sendPreface(ChannelHandlerContext ctx) throws Exception { if (prefaceSent || !ctx.channel().isActive()) { return; } prefaceSent = true; final boolean isClient = !connection().isServer(); if (isClient) { // 客户端发送的第一个消息是一段魔法字符串 PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n ctx.write(connectionPrefaceBuf()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } // 发送SETTINGS Frame encoder.writeSettings(ctx, initialSettings, ctx.newPromise()).addListener( ChannelFutureListener.CLOSE_ON_FAILURE); if (isClient) { userEventTriggered(ctx, Http2ConnectionPrefaceAndSettingsFrameWrittenEvent.INSTANCE); } }

对方在接收到SETTINGSFrame后,就会对流控进行配置。 读取SETTINGSFrame的代码在DefaultHttp2FrameReader#readSettingsFrame(),每个Setting占用6个字节,包含2字节的Key和4字节的Value。

private void readSettingsFrame(ChannelHandlerContext ctx, ByteBuf payload, Http2FrameListener listener) throws Http2Exception { if (flags.ack()) {// 对方回复的ack listener.onSettingsAckRead(ctx); } else { // 长度除以6 就是Setting个数 int numSettings = payloadLength / SETTING_ENTRY_LENGTH; Http2Settings settings = new Http2Settings(); for (int index = 0; index 0 && isChannelWritable()) { writePendingBytes(); } }

每个Stream都有一个属于自己的FlowState用来处理流控,字段window记录当前Stream的可用窗口大小,一开始window值是0,接收到初始窗口其实就是递增窗口大小。

int incrementStreamWindow(int delta) throws Http2Exception { if (delta > 0 && Integer.MAX_VALUE - delta 0 && Integer.MAX_VALUE - delta


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3