聊聊hikari连接池的leakDetectionThreshold

您所在的位置:网站首页 hikari内存泄漏 聊聊hikari连接池的leakDetectionThreshold

聊聊hikari连接池的leakDetectionThreshold

2023-09-10 08:32| 来源: 网络整理| 查看: 265

本文主要研究一下hikari连接池的leakDetectionThreshold,也就是连接池泄露检测。

leakDetectionThreshold

用来设置连接被占用的超时时间,单位为毫秒,默认为0,表示禁用连接泄露检测。 如果大于0且不是单元测试,则进一步判断:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),会被重置为0 . 即如果要生效则必须>0,同时满足:不能小于2秒,而且当maxLifetime > 0时不能大于maxLifetime,该值默认为1800000,即30分钟。

HikariPool.getConnection

HikariCP-2.7.6-sources.jar!/com/zaxxer/hikari/pool/HikariPool.java

/** * Get a connection from the pool, or timeout after the specified number of milliseconds. * * @param hardTimeout the maximum time to wait for a connection from the pool * @return a java.sql.Connection instance * @throws SQLException thrown if a timeout occurs trying to obtain a connection */ public Connection getConnection(final long hardTimeout) throws SQLException { suspendResumeLock.acquire(); final long startTime = currentTime(); try { long timeout = hardTimeout; do { PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS); if (poolEntry == null) { break; // We timed out... break and throw exception } final long now = currentTime(); if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > ALIVE_BYPASS_WINDOW_MS && !isConnectionAlive(poolEntry.connection))) { closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE); timeout = hardTimeout - elapsedMillis(startTime); } else { metricsTracker.recordBorrowStats(poolEntry, startTime); return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now); } } while (timeout > 0L); metricsTracker.recordBorrowTimeoutStats(startTime); throw createTimeoutException(startTime); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SQLException(poolName + " - Interrupted during connection acquisition", e); } finally { suspendResumeLock.release(); } }

注意,getConnection返回的时候调用了poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now) 注意,这里创建代理连接的时候关联了ProxyLeakTask

leakTaskFactory

HikariCP-2.7.6-sources.jar!/com/zaxxer/hikari/pool/HikariPool.java

/** * Construct a HikariPool with the specified configuration. * * @param config a HikariConfig instance */ public HikariPool(final HikariConfig config) { super(config); this.connectionBag = new ConcurrentBag(this); this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK; this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); checkFailFast(); if (config.getMetricsTrackerFactory() != null) { setMetricsTrackerFactory(config.getMetricsTrackerFactory()); } else { setMetricRegistry(config.getMetricRegistry()); } setHealthCheckRegistry(config.getHealthCheckRegistry()); registerMBeans(this); ThreadFactory threadFactory = config.getThreadFactory(); LinkedBlockingQueue addConnectionQueue = new LinkedBlockingQueue(config.getMaximumPoolSize()); this.addConnectionQueue = unmodifiableCollection(addConnectionQueue); this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy()); this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, HOUSEKEEPING_PERIOD_MS, MILLISECONDS); } /** * Create/initialize the Housekeeping service {@link ScheduledExecutorService}. If the user specified an Executor * to be used in the {@link HikariConfig}, then we use that. If no Executor was specified (typical), then create * an Executor and configure it. * * @return either the user specified {@link ScheduledExecutorService}, or the one we created */ private ScheduledExecutorService initializeHouseKeepingExecutorService() { if (config.getScheduledExecutor() == null) { final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElse(new DefaultThreadFactory(poolName + " housekeeper", true)); final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy()); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); executor.setRemoveOnCancelPolicy(true); return executor; } else { return config.getScheduledExecutor(); } }

注意,这里初始化了leakTaskFactory,以及houseKeepingExecutorService

ProxyLeakTaskFactory

HikariCP-2.7.6-sources.jar!/com/zaxxer/hikari/pool/ProxyLeakTaskFactory.java

/** * A factory for {@link ProxyLeakTask} Runnables that are scheduled in the future to report leaks. * * @author Brett Wooldridge * @author Andreas Brenk */ class ProxyLeakTaskFactory { private ScheduledExecutorService executorService; private long leakDetectionThreshold; ProxyLeakTaskFactory(final long leakDetectionThreshold, final ScheduledExecutorService executorService) { this.executorService = executorService; this.leakDetectionThreshold = leakDetectionThreshold; } ProxyLeakTask schedule(final PoolEntry poolEntry) { return (leakDetectionThreshold == 0) ? ProxyLeakTask.NO_LEAK : scheduleNewTask(poolEntry); } void updateLeakDetectionThreshold(final long leakDetectionThreshold) { this.leakDetectionThreshold = leakDetectionThreshold; } private ProxyLeakTask scheduleNewTask(PoolEntry poolEntry) { ProxyLeakTask task = new ProxyLeakTask(poolEntry); task.schedule(executorService, leakDetectionThreshold); return task; } }

注意,如果leakDetectionThreshold=0,即禁用连接泄露检测,schedule返回的是ProxyLeakTask.NO_LEAK,否则则新建一个ProxyLeakTask,在leakDetectionThreshold时间后触发

ProxyLeakTask

HikariCP-2.7.6-sources.jar!/com/zaxxer/hikari/pool/ProxyLeakTask.java

/** * A Runnable that is scheduled in the future to report leaks. The ScheduledFuture is * cancelled if the connection is closed before the leak time expires. * * @author Brett Wooldridge */ class ProxyLeakTask implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class); static final ProxyLeakTask NO_LEAK; private ScheduledFuture scheduledFuture; private String connectionName; private Exception exception; private String threadName; private boolean isLeaked; static { NO_LEAK = new ProxyLeakTask() { @Override void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {} @Override public void run() {} @Override public void cancel() {} }; } ProxyLeakTask(final PoolEntry poolEntry) { this.exception = new Exception("Apparent connection leak detected"); this.threadName = Thread.currentThread().getName(); this.connectionName = poolEntry.connection.toString(); } private ProxyLeakTask() { } void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) { scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS); } /** {@inheritDoc} */ @Override public void run() { isLeaked = true; final StackTraceElement[] stackTrace = exception.getStackTrace(); final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5]; System.arraycopy(stackTrace, 5, trace, 0, trace.length); exception.setStackTrace(trace); LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception); } void cancel() { scheduledFuture.cancel(false); if (isLeaked) { LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName); } } }

可以看到NO_LEAK类里头的方法都是空操作 一旦该task被触发,则抛出Exception(“Apparent connection leak detected”)

ProxyConnection.close

HikariCP-2.7.6-sources.jar!/com/zaxxer/hikari/pool/ProxyConnection.java

/** {@inheritDoc} */ @Override public final void close() throws SQLException { // Closing statements can cause connection eviction, so this must run before the conditional below closeStatements(); if (delegate != ClosedConnection.CLOSED_CONNECTION) { leakTask.cancel(); try { if (isCommitStateDirty && !isAutoCommit) { delegate.rollback(); lastAccess = currentTime(); LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate); } if (dirtyBits != 0) { poolEntry.resetConnectionState(this, dirtyBits); lastAccess = currentTime(); } delegate.clearWarnings(); } catch (SQLException e) { // when connections are aborted, exceptions are often thrown that should not reach the application if (!poolEntry.isMarkedEvicted()) { throw checkException(e); } } finally { delegate = ClosedConnection.CLOSED_CONNECTION; poolEntry.recycle(lastAccess); } } } @SuppressWarnings("EmptyTryBlock") private synchronized void closeStatements() { final int size = openStatements.size(); if (size > 0) { for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) { try (Statement ignored = openStatements.get(i)) { // automatic resource cleanup } catch (SQLException e) { LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()", poolEntry.getPoolName(), delegate); leakTask.cancel(); poolEntry.evict("(exception closing Statements during Connection.close())"); delegate = ClosedConnection.CLOSED_CONNECTION; } } openStatements.clear(); } } final SQLException checkException(SQLException sqle) { SQLException nse = sqle; for (int depth = 0; delegate != ClosedConnection.CLOSED_CONNECTION && nse != null && depth < 10; depth++) { final String sqlState = nse.getSQLState(); if (sqlState != null && sqlState.startsWith("08") || ERROR_STATES.contains(sqlState) || ERROR_CODES.contains(nse.getErrorCode())) { // broken connection LOGGER.warn("{} - Connection {} marked as broken because of SQLSTATE({}), ErrorCode({})", poolEntry.getPoolName(), delegate, sqlState, nse.getErrorCode(), nse); leakTask.cancel(); poolEntry.evict("(connection is broken)"); delegate = ClosedConnection.CLOSED_CONNECTION; } else { nse = nse.getNextException(); } } return sqle; }

连接有借有还,在connection的close的时候,closeStatements,checkException会调用leakTask.cancel();取消检测连接泄露的task。另外在delegate != ClosedConnection.CLOSED_CONNECTION的时候会显示调用leakTask.cancel();

HikariPool.evictConnection

HikariCP-2.7.6-sources.jar!/com/zaxxer/hikari/pool/HikariPool.java

/** * Evict a Connection from the pool. * * @param connection the Connection to evict (actually a {@link ProxyConnection}) */ public void evictConnection(Connection connection) { ProxyConnection proxyConnection = (ProxyConnection) connection; proxyConnection.cancelLeakTask(); try { softEvictConnection(proxyConnection.getPoolEntry(), "(connection evicted by user)", !connection.isClosed() /* owner */); } catch (SQLException e) { // unreachable in HikariCP, but we're still forced to catch it } }

如果用户手工调用evictConnection的话,也会cancelLeakTask

实例 /** * leak-detection-threshold: 5000 * @throws SQLException * @throws InterruptedException */ @Test public void testConnLeak() throws SQLException, InterruptedException { Connection conn = dataSource.getConnection(); TimeUnit.SECONDS.sleep(10); String sql = "select 1"; PreparedStatement pstmt = null; try { pstmt = (PreparedStatement)conn.prepareStatement(sql); ResultSet rs = pstmt.executeQuery(); int col = rs.getMetaData().getColumnCount(); while (rs.next()) { for (int i = 1; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3