图解系列 图解Spring Boot 最大连接数及最大并发数

2024-07-09

文章目录 概序架构图TCP的3次握手4次挥手时序图核心参数AcceptCountMaxConnectionsMinSpareThread/MaxThreadMaxKeepAliveRequestsConnectionTimeoutKeepAliveTimeout 内部线程AcceptorPollerTomcatThreadPoolExecutor 测试参考

每个Spring Boot版本和内置容器不同,结果也不同,这里以Spring Boot 2.7.10版本 + 内置Tomcat容器举例。





server: tomcat: # 当所有可能的请求处理线程都在使用中时,传入连接请求的最大队列长度 accept-count: 100 # 服务器在任何给定时间接受和处理的最大连接数。一旦达到限制,操作系统仍然可以接受基于“acceptCount”属性的连接。 max-connections: 8192 threads: # 工作线程的最小数量,初始化时创建的线程数 min-spare: 10 # 工作线程的最大数量 io密集型建议10倍的cpu数,cpu密集型建议cpu数+1,绝大部分应用都是io密集型 max: 200 # 连接器在接受连接后等待显示请求 URI 行的时间。 connection-timeout: 20000 # 在关闭连接之前等待另一个 HTTP 请求的时间。如果未设置,则使用 connectionTimeout。设置为 -1 时不会超时。 keep-alive-timeout: 20000 # 在连接关闭之前可以进行流水线处理的最大HTTP请求数量。当设置为0或1时,禁用keep-alive和流水线处理。当设置为-1时,允许无限数量的流水线处理或keep-alive请求。 max-keep-alive-requests: 100 架构图

当连接数大于maxConnections+acceptCount + 1时,新来的请求不会收到服务器拒绝连接响应,而是不会和新的请求进行3次握手建立连接,一段时间后(客户端的超时时间或者Tomcat的20s后)会出现请求连接超时。



核心参数 AcceptCount



serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); // 这里 serverSock.socket().bind(addr,getAcceptCount()); MaxConnections


// 线程的run方法。 public void run() { while (!stopCalled) { // 如果我们已达到最大连接数,等待 connectionLimitLatch.countUpOrAwait(); // 接受来自服务器套接字的下一个传入连接 socket = endpoint.serverSocketAccept() // socket.close 释放的时候 调用 connectionLimitLatch.countDown(); MinSpareThread/MaxThread


// tomcat 启动时 public void createExecutor() { internalExecutor = true; // 容量为Integer.MAX_VALUE TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); // Tomcat扩展的线程池 executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }



JDK线程池流程:minThreads --> queue --> maxThreads --> ExceptionTomcat增强后: minThreads --> maxThreads --> queue --> Exception MaxKeepAliveRequests



较大的 MaxKeepAliveRequests 值可能会导致服务器上的连接资源被长时间占用。根据您的具体需求,您可以根据服务器的负载和资源配置来调整 MaxKeepAliveRequests 的值,以平衡并发连接和服务器资源的利用率。

NioEndpoint.setSocketOptions socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); Http11Processor.service(SocketWrapperBase socketWrapper) keepAlive = true; while(!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) { // 默认100 int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests(); if (maxKeepAliveRequests == 1) { keepAlive = false; } else if (maxKeepAliveRequests > 0 && // socketWrapper.decrementKeepAlive() 0 && delta > timeout) { readTimeout = true; } } // Check for write timeout if (!readTimeout && (socketWrapper.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) { long delta = now - socketWrapper.getLastWrite(); long timeout = socketWrapper.getWriteTimeout(); if (timeout > 0 && delta > timeout) { writeTimeout = true; } } KeepAliveTimeout

等待另一个 HTTP 请求的时间,然后关闭连接。当未设置时,将使用 connectionTimeout。当设置为 -1 时,将没有超时。


// Read new bytes if needed if (byteBuffer.position() >= byteBuffer.limit()) { if (keptAlive) { // 还没有读取任何请求数据,所以使用保持活动超时 wrapper.setReadTimeout(keepAliveTimeout); } if (!fill(false)) { // A read is pending, so no longer in initial state parsingRequestLinePhase = 1; return false; } // 至少已收到请求的一个字节 切换到套接字超时。 wrapper.setReadTimeout(connectionTimeout); } 内部线程 Acceptor

Acceptor: 接收器,作用是接受scoket网络请求,并调用setSocketOptions()封装成为NioSocketWrapper,并注册到Poller的events中。注意查看run方法org.apache.tomcat.util.net.Acceptor#run

public void run() { while (!stopCalled) { // 等待下一个请求进来 socket = endpoint.serverSocketAccept(); // 注册socket到Poller,生成PollerEvent事件 endpoint.setSocketOptions(socket); // 向轮询器注册新创建的套接字 - poller.register(socketWrapper); - (SynchronizedQueue(128))events.add(new PollerEvent(socketWrapper)) Poller


public void run() { while (true) { //查询selector取出所有请求事件 Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // 遍历就绪键的集合并调度任何活动事件。 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); iterator.remove(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // 分配给Executor线程池执行处理请求key if (socketWrapper != null) { processKey(sk, socketWrapper); - processSocket(socketWrapper, SocketEvent.OPEN_READ/SocketEvent.OPEN_WRITE) - executor.execute((Runnable)new SocketProcessor(socketWrapper,SocketEvent)) } } TomcatThreadPoolExecutor



public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); // tomcat自定义线程池 executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }


// 与 java.util.concurrent.ThreadPoolExecutor 相同,但实现了更高效的getSubmittedCount()方法,用于正确处理工作队列。 // 如果未指定 RejectedExecutionHandler,将配置一个默认的,并且该处理程序将始终抛出 RejectedExecutionException public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { // 已提交但尚未完成的任务数。这包括队列中的任务和已交给工作线程但后者尚未开始执行任务的任务。 // 这个数字总是大于或等于getActiveCount() 。 private final AtomicInteger submittedCount = new AtomicInteger(0); @Override protected void afterExecute(Runnable r, Throwable t) { if (!(t instanceof StopPooledThreadException)) { submittedCount.decrementAndGet(); } @Override public void execute(Runnable command){ // 提交任务的数量+1 submittedCount.incrementAndGet(); try { // 线程池内部方法,真正执行的方法。就是JDK线程池原生的方法。 super.execute(command); } catch (RejectedExecutionException rx) { // 再次把被拒绝的任务放入到队列中。 if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { //强制的将任务放入到阻塞队列中 if (!queue.force(command, timeout, unit)) { //放入失败,则继续抛出异常 submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { //被中断也抛出异常 submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { //不是这种队列,那么当任务满了之后,直接抛出去。 submittedCount.decrementAndGet(); throw rx; } } } /** * 实现Tomcat特有逻辑的自定义队列 */ public class TaskQueue extends LinkedBlockingQueue { private static final long serialVersionUID = 1L; private transient volatile ThreadPoolExecutor parent = null; private static final int DEFAULT_FORCED_REMAINING_CAPACITY = -1; /** * 强制遗留的容量 */ private int forcedRemainingCapacity = -1; /** * 队列的构建方法 */ public TaskQueue() { } public TaskQueue(int capacity) { super(capacity); } public TaskQueue(Collection c) { super(c); } /** * 设置核心变量 */ public void setParent(ThreadPoolExecutor parent) { this.parent = parent; } /** * put:向阻塞队列填充元素,当阻塞队列满了之后,put时会被阻塞。 * offer:向阻塞队列填充元素,当阻塞队列满了之后,offer会返回false。 * * @param o 当任务被拒绝后,继续强制的放入到线程池中 * @return 向阻塞队列塞任务,当阻塞队列满了之后,offer会返回false。 */ public boolean force(Runnable o) { if (parent == null || parent.isShutdown()) { throw new RejectedExecutionException("taskQueue.notRunning"); } return super.offer(o); } /** * 带有阻塞时间的塞任务 */ @Deprecated public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (parent == null || parent.isShutdown()) { throw new RejectedExecutionException("taskQueue.notRunning"); } return super.offer(o, timeout, unit); //forces the item onto the queue, to be used if the task is rejected } /** * 当线程真正不够用时,优先是开启线程(直至最大线程),其次才是向队列填充任务。 * * @param runnable 任务 * @return false 表示向队列中添加任务失败, */ @Override public boolean offer(Runnable runnable) { if (parent == null) { return super.offer(runnable); } //若是达到最大线程数,进队列。 if (parent.getPoolSize() == parent.getMaximumPoolSize()) { return super.offer(runnable); } //当前活跃线程为10个,但是只有8个任务在执行,于是,直接进队列。 if (parent.getSubmittedCount()




