druid数据源源码解读及参数解释 |
您所在的位置:网站首页 › 上海美博会怎么参加 › druid数据源源码解读及参数解释 |
如果我没猜错的话:目前线上在跑的项目里,不少数据源配置,参数等都是拍脑袋随便写出来的。当出现故障(数据库宕机,惊现慢sql,网络抖动等)时,你的参数并不能保护好服务,甚至还可能引起一系列的雪崩。 本文以目前druid的最新版(1.2.5)为例,讲解数据源中主要参数作用以及推荐值。并给出各种意外场景下的压测结果。对于其他类型的数据源,也可以参考使用。 前言,数据源作用 一句话,数据源就是用来存储数据库连接的。druid中存储数据源连接的属性如下,嗯,就是一个数组。 /** com.alibaba.druid.pool.DruidDataSource */ private volatile DruidConnectionHolder[] connections; 复制代码所以,我们用他干的最多的事,就是拿连接: Connection getConnection() throws SQLException; Connection getConnection(String username, String password) throws SQLException; 复制代码 池中最大活跃连接数:maxActive数据源中最好理解的一个参数。数据源最多可以创建多少个连接,就是由这个参数决定的。 池中初始连接数:initialSize在数据源初始化时,创建的初始连接数。 代码位于com.alibaba.druid.pool.DruidDataSource#init 根据asyncInit决定是同步创建,还是异步创建。 if (createScheduler != null && asyncInit) { for (int i = 0; i < initialSize; ++i) { submitCreateTask(true); } } else if (!asyncInit) { // init connections while (poolingCount < initialSize) { try { PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); connections[poolingCount++] = holder; } catch (SQLException ex) { ... } } } 复制代码 等待取连接的最长时间:maxWait从数据源中取一个连接出来,有两种情况: 目前有连接,直接返回。 目前没有连接 -> 连接池已满 -> 等待其他线程释放连接。 连接池未满 -> 同步创建连接,完成后直接返回 异步创建连接,完成后通知而这个参数,正是针对目前没有连接的情况设计的,我们跟代码看一下详细流程: 拿连接重试(notFullTimeoutRetryCount)该方法的入参就是maxWait,拿主要逻辑在getConnectionInternal中,这里有一个非常重要的参数:notFullTimeoutRetryCount,当连接池不满时,拿连接失败的重试次数。 根据下面的代码逻辑可以看出,druid中写死的为至少重试一次。 public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt 0) { createDirect = true; continue; } } 复制代码由于有上面三个条件,只有很少的线程可以进入到同步创建连接流程中。 if (creatingCountUpdater.compareAndSet(this, 0, 1)) { PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); ... creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this); ... } 复制代码在同步创建连接过程中,存在一个很重要的条件变量转换,下面将failFast时会讲到。 异步创建连接我们先看两个信号量: protected Condition notEmpty; protected Condition empty; 复制代码这两个信号量分别当前池中连接不为空的通知,和连接为空的通知。 当决定要异步创建连接后,会根据配置maxWait是否大于0走两个不同的分支: final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); ... if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); } 复制代码先看pollLast(nanos),如果池中没有连接,直接使用信号量:empty,并创建异步线程去创建连接。 private void emptySignal() { ... if (createTaskCount >= maxCreateTaskCount) { return; } if (activeCount + poolingCount + createTaskCount >= maxActive) { return; } submitCreateTask(false); } 复制代码然后,自己使用notEmpty信号量进入等待: estimate = notEmpty.awaitNanos(estimate); // signal by // recycle or // creator 复制代码本次等待的最长时间estimate就是根据maxWait计算出来的。 这里为什么是说本次等待?因为在有连接被回收,或新创建出来后,他是会被唤醒的。但是,如果唤醒后本线程没有抢到连接,并且目前总的等待时间还没到maxWait,就会再次进入await。 而无参的takeLast()就会简单很多: 流程和pollLast(nanos)几乎一致,唯一的不同就是:他的等待没有超时时间,只有等待回收线程,或创建线程唤醒后,才可以继续执行。 while (poolingCount == 0) { ... notEmpty.await(); // signal by recycle or creator ... } 复制代码 异步创建线程回到刚才的emptySignal()方法中,在最后一行代码中,创建了一个明为CreateConnectionTask的线程,并提交到线程池执行。 try { physicalConnection = createPhysicalConnection(); } catch (OutOfMemoryError e) { errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { ock.unlock(); } } } ... } 复制代码这里的setFailContinuous也和failFast有关,后面会讲到。 连接回收当调用DruidPooledConnection的close()方法后,会进入到连接回收流程中(所以说呢,数据源中的连接关闭并不是真的关闭)。 protected void recycle(DruidPooledConnection pooledConnection) throws SQLException { ... result = putLast(holder, currentTimeMillis); ... } 复制代码流程和取连接类似,这里调用了putLast方法,将连接还回去。 同样的,这里也会通过调用notEmpty.signal(),唤醒一个等待连接的线程。 boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { ... e.lastActiveTimeMillis = lastActiveTimeMillis; connections[poolingCount] = e; incrementPoolingCount(); ... notEmpty.signal(); notEmptySignalCount++; return true; } 复制代码 MaxWait总结该参数是在数据源目前没有可用的情况下,单次获取连接的最长等待时间。 该超时时间主要消耗在:创建连接或等待其他线程释放连接。 并且,在连接池未满的情况下,会有至少一次重试机制(无法关闭)。 最大等待连接线程个数 maxWaitThreadCount理解了MaxWait,这个就简单多了。 首先,我们知道:druid在池中没有可用连接,而总连接数没达到maxActive时,druid会采取新建异步线程去创建连接,而主线程等待。 那么,同时最多可以有多少个线程在等待呢?就是maxWaitThreadCount。 相关代码在com.alibaba.druid.pool.DruidDataSource#getConnectionInternal中。 if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } 复制代码也就是说,当目前等待连接的线程数>maxWaitThreadCount时,直接失败,自己不再等创建连接,或回收连接。 等待快速失败 failFast顾名思义,快速失败。改配置项是一个开关,取值为true/false。 在当前池中无连接时触发,代码如下: private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { long estimate = nanos; for (;;) { if (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } ... } ... } 复制代码同样的,takeLast中也有这串逻辑。 DruidConnectionHolder takeLast() throws InterruptedException, SQLException { ... while (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } ... } ... } 复制代码大家也注意到了,关于本次请求是否执行快速失败,还有一个关键参数:isFailContinuous()。 isFailContinuous该变量采用原子更新0/1标记当前是否满足快速失败状态。 在创建连接的catch部分代码中,会将其翻转为1,标识快速失败条件满足。 这样,当开启快速失败配置后,所有将要通过异步流程创建连接的的请求都会直接失败。而不再进入到异步/等待过程中。 哪何时才会将该标记翻转为0呢? 当然是有线程创建连接成功后。 相关代码位于com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection。 public PhysicalConnectionInfo createPhysicalConnection() throws SQLException { ... conn = createPhysicalConnection(url, physicalConnectProperties); initPhysicalConnection(conn, variables, globalVariables); ... initedNanos = System.nanoTime(); validateConnection(conn); validatedNanos = System.nanoTime(); setFailContinuous(false); setCreateError(null); ... } 复制代码那么,当快速失败开启后,还有哪些机会可以使其状态翻转呢? 异步创建连接流程 别忘了,就算快速失败开启,也是先创建后台异步线程才失败的。 同步创建连接流程 虽然通过同步创建的连接很少,但是也是有的呀。 failFast总结failFast与maxWait配合使用。当出现创建连接失败时,后续因为池中为空,而想要异步创建连接的线程不会再等待maxWait,直接快速失败。 同时,只要有线程创建连接成功,便会关闭failFast。 物理连接超时时间 phyTimeoutMillis单个物理连接的最长保持时间。 因为数据源本身就是保持/缓存数据库连接的,一般情况下,不会主动端口与DB的连接。但是,如果将phyTimeoutMillis配置为一个>0的数值,当一个物理连接开启超过配置时间后,就会关闭连接(当然是在确保目前该连接没有再使用的情况下)。 if (phyTimeoutMillis > 0) { long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { discardConnection(holder); return; } } 复制代码 最大异步创建连接任务数 maxCreateTaskCount在连接不足,需要异步创建连接时,存在两种情况不会再去添加异步任务创建新连接。 当前活跃连接数 + 当前池中连接数 + 当前创建连接任务数 > maxActive时。 当前创建连接任务数 > maxCreateTaskCount 时。但是,尽管当前主线程没有去提交异步创建连接任务,他也一样会使用notEmpty.await等待连接。 上层调用方并不关心有没有真的去提交CreateTask任务。 private void emptySignal() { ... if (createTaskCount >= maxCreateTaskCount) { return; } if (activeCount + poolingCount + createTaskCount >= maxActive) { return; } submitCreateTask(false); } 复制代码 连接保活 keepAlive & keepAliveBetweenTimeMillisdruid中的keepAlive选项与tcp中的类似。 为了防止一个数据库连接太久没有使用,而被其他下层服务关闭,druid中也定义了keepAlive选项。 如果keepAlive打开,当一个连接的空闲时间超过keepAliveBetweenTimeMillis时,就会使用validationQuery执行一次查询。 if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } ... for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } // skip } ... } 复制代码如果本次validationQuery执行失败,则关闭该连接,并丢弃。 数据源收缩 timeBetweenEvictionRunsMillis & minEvictableIdleTimeMillis & maxEvictableIdleTimeMillis在druid数据源初始化时,会创建一个定时运行的DestroyTask。 该任务的主要目的就是将已空闲时间满足关闭条件的连接关闭。 if (idleMillis >= minEvictableIdleTimeMillis) { if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { evictConnections[evictCount++] = connection; continue; } } ... if (evictCount > 0) { for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } Arrays.fill(evictConnections, null); } 复制代码从上面的代码中可以看出,对于要关闭的空闲连接选择逻辑如下: if (checkTime && i < checkCount) : 对于空闲时间>minEvictableIdleTimeMillis的连接,仅会关闭前poolingCount - minIdle个,后面的连接不会受到影响。 (也就是只扫描前checkCount个,checkCount即为当前池中连接数超出minIdle的个数)。 而对于空闲时间>maxEvictableIdleTimeMillis的连接,会直接进行关闭。 最后,timeBetweenEvictionRunsMillis就是该定时收缩任务的运行间隔: long period = timeBetweenEvictionRunsMillis; if (period |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |