druid数据源源码解读及参数解释

您所在的位置:网站首页 上海美博会怎么参加 druid数据源源码解读及参数解释

druid数据源源码解读及参数解释

2023-08-01 12:55| 来源: 网络整理| 查看: 265

如果我没猜错的话:目前线上在跑的项目里,不少数据源配置,参数等都是拍脑袋随便写出来的。当出现故障(数据库宕机,惊现慢sql,网络抖动等)时,你的参数并不能保护好服务,甚至还可能引起一系列的雪崩。

本文以目前druid的最新版(1.2.5)为例,讲解数据源中主要参数作用以及推荐值。并给出各种意外场景下的压测结果。对于其他类型的数据源,也可以参考使用。

前言,数据源作用

一句话,数据源就是用来存储数据库连接的。druid中存储数据源连接的属性如下,嗯,就是一个数组。

/** com.alibaba.druid.pool.DruidDataSource */ private volatile DruidConnectionHolder[] connections; 复制代码

所以,我们用他干的最多的事,就是拿连接:

Connection getConnection() throws SQLException; Connection getConnection(String username, String password) throws SQLException; 复制代码 池中最大活跃连接数:maxActive

数据源中最好理解的一个参数。数据源最多可以创建多少个连接,就是由这个参数决定的。

池中初始连接数:initialSize

在数据源初始化时,创建的初始连接数。

代码位于com.alibaba.druid.pool.DruidDataSource#init

根据asyncInit决定是同步创建,还是异步创建。

if (createScheduler != null && asyncInit) { for (int i = 0; i < initialSize; ++i) { submitCreateTask(true); } } else if (!asyncInit) { // init connections while (poolingCount < initialSize) { try { PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); connections[poolingCount++] = holder; } catch (SQLException ex) { ... } } } 复制代码 等待取连接的最长时间:maxWait

从数据源中取一个连接出来,有两种情况:

目前有连接,直接返回。

目前没有连接 ->

连接池已满 -> 等待其他线程释放连接。 连接池未满 -> 同步创建连接,完成后直接返回 异步创建连接,完成后通知

而这个参数,正是针对目前没有连接的情况设计的,我们跟代码看一下详细流程:

拿连接重试(notFullTimeoutRetryCount)

该方法的入参就是maxWait,拿主要逻辑在getConnectionInternal中,这里有一个非常重要的参数:notFullTimeoutRetryCount,当连接池不满时,拿连接失败的重试次数。

根据下面的代码逻辑可以看出,druid中写死的为至少重试一次。

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (;;) { // handle notFullTimeoutRetry DruidPooledConnection poolableConnection; try { poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt 0) { createDirect = true; continue; } } 复制代码

由于有上面三个条件,只有很少的线程可以进入到同步创建连接流程中。

if (creatingCountUpdater.compareAndSet(this, 0, 1)) { PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection(); holder = new DruidConnectionHolder(this, pyConnInfo); ... creatingCountUpdater.decrementAndGet(this); directCreateCountUpdater.incrementAndGet(this); ... } 复制代码

在同步创建连接过程中,存在一个很重要的条件变量转换,下面将failFast时会讲到。

异步创建连接

我们先看两个信号量:

protected Condition notEmpty; protected Condition empty; 复制代码

这两个信号量分别当前池中连接不为空的通知,和连接为空的通知。

当决定要异步创建连接后,会根据配置maxWait是否大于0走两个不同的分支:

final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); ... if (maxWait > 0) { holder = pollLast(nanos); } else { holder = takeLast(); } 复制代码

先看pollLast(nanos),如果池中没有连接,直接使用信号量:empty,并创建异步线程去创建连接。

private void emptySignal() { ... if (createTaskCount >= maxCreateTaskCount) { return; } if (activeCount + poolingCount + createTaskCount >= maxActive) { return; } submitCreateTask(false); } 复制代码

然后,自己使用notEmpty信号量进入等待:

estimate = notEmpty.awaitNanos(estimate); // signal by // recycle or // creator 复制代码

本次等待的最长时间estimate就是根据maxWait计算出来的。

这里为什么是说本次等待?因为在有连接被回收,或新创建出来后,他是会被唤醒的。但是,如果唤醒后本线程没有抢到连接,并且目前总的等待时间还没到maxWait,就会再次进入await。

而无参的takeLast()就会简单很多:

流程和pollLast(nanos)几乎一致,唯一的不同就是:他的等待没有超时时间,只有等待回收线程,或创建线程唤醒后,才可以继续执行。

while (poolingCount == 0) { ... notEmpty.await(); // signal by recycle or creator ... } 复制代码 异步创建线程

回到刚才的emptySignal()方法中,在最后一行代码中,创建了一个明为CreateConnectionTask的线程,并提交到线程池执行。

try { physicalConnection = createPhysicalConnection(); } catch (OutOfMemoryError e) { errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) { // fail over retry attempts setFailContinuous(true); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { ock.unlock(); } } } ... } 复制代码

这里的setFailContinuous也和failFast有关,后面会讲到。

连接回收

当调用DruidPooledConnection的close()方法后,会进入到连接回收流程中(所以说呢,数据源中的连接关闭并不是真的关闭)。

protected void recycle(DruidPooledConnection pooledConnection) throws SQLException { ... result = putLast(holder, currentTimeMillis); ... } 复制代码

流程和取连接类似,这里调用了putLast方法,将连接还回去。

同样的,这里也会通过调用notEmpty.signal(),唤醒一个等待连接的线程。

boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) { ... e.lastActiveTimeMillis = lastActiveTimeMillis; connections[poolingCount] = e; incrementPoolingCount(); ... notEmpty.signal(); notEmptySignalCount++; return true; } 复制代码 MaxWait总结

该参数是在数据源目前没有可用的情况下,单次获取连接的最长等待时间。

该超时时间主要消耗在:创建连接或等待其他线程释放连接。

并且,在连接池未满的情况下,会有至少一次重试机制(无法关闭)。

最大等待连接线程个数 maxWaitThreadCount

理解了MaxWait,这个就简单多了。

首先,我们知道:druid在池中没有可用连接,而总连接数没达到maxActive时,druid会采取新建异步线程去创建连接,而主线程等待。

那么,同时最多可以有多少个线程在等待呢?就是maxWaitThreadCount。

相关代码在com.alibaba.druid.pool.DruidDataSource#getConnectionInternal中。

if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this); throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } 复制代码

也就是说,当目前等待连接的线程数>maxWaitThreadCount时,直接失败,自己不再等创建连接,或回收连接。

等待快速失败 failFast

顾名思义,快速失败。改配置项是一个开关,取值为true/false。

在当前池中无连接时触发,代码如下:

private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException { long estimate = nanos; for (;;) { if (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } ... } ... } 复制代码

同样的,takeLast中也有这串逻辑。

DruidConnectionHolder takeLast() throws InterruptedException, SQLException { ... while (poolingCount == 0) { emptySignal(); // send signal to CreateThread create connection if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException(createError); } ... } ... } 复制代码

大家也注意到了,关于本次请求是否执行快速失败,还有一个关键参数:isFailContinuous()。

isFailContinuous

该变量采用原子更新0/1标记当前是否满足快速失败状态。

在创建连接的catch部分代码中,会将其翻转为1,标识快速失败条件满足。

这样,当开启快速失败配置后,所有将要通过异步流程创建连接的的请求都会直接失败。而不再进入到异步/等待过程中。

哪何时才会将该标记翻转为0呢?

当然是有线程创建连接成功后。

相关代码位于com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection。

public PhysicalConnectionInfo createPhysicalConnection() throws SQLException { ... conn = createPhysicalConnection(url, physicalConnectProperties); initPhysicalConnection(conn, variables, globalVariables); ... initedNanos = System.nanoTime(); validateConnection(conn); validatedNanos = System.nanoTime(); setFailContinuous(false); setCreateError(null); ... } 复制代码

那么,当快速失败开启后,还有哪些机会可以使其状态翻转呢?

异步创建连接流程

别忘了,就算快速失败开启,也是先创建后台异步线程才失败的。

同步创建连接流程

虽然通过同步创建的连接很少,但是也是有的呀。

failFast总结

failFast与maxWait配合使用。当出现创建连接失败时,后续因为池中为空,而想要异步创建连接的线程不会再等待maxWait,直接快速失败。

同时,只要有线程创建连接成功,便会关闭failFast。

物理连接超时时间 phyTimeoutMillis

单个物理连接的最长保持时间。

因为数据源本身就是保持/缓存数据库连接的,一般情况下,不会主动端口与DB的连接。但是,如果将phyTimeoutMillis配置为一个>0的数值,当一个物理连接开启超过配置时间后,就会关闭连接(当然是在确保目前该连接没有再使用的情况下)。

if (phyTimeoutMillis > 0) { long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { discardConnection(holder); return; } } 复制代码 最大异步创建连接任务数 maxCreateTaskCount

在连接不足,需要异步创建连接时,存在两种情况不会再去添加异步任务创建新连接。

当前活跃连接数 + 当前池中连接数 + 当前创建连接任务数 > maxActive时。 当前创建连接任务数 > maxCreateTaskCount 时。

但是,尽管当前主线程没有去提交异步创建连接任务,他也一样会使用notEmpty.await等待连接。

上层调用方并不关心有没有真的去提交CreateTask任务。

private void emptySignal() { ... if (createTaskCount >= maxCreateTaskCount) { return; } if (activeCount + poolingCount + createTaskCount >= maxActive) { return; } submitCreateTask(false); } 复制代码 连接保活 keepAlive & keepAliveBetweenTimeMillis

druid中的keepAlive选项与tcp中的类似。

为了防止一个数据库连接太久没有使用,而被其他下层服务关闭,druid中也定义了keepAlive选项。

如果keepAlive打开,当一个连接的空闲时间超过keepAliveBetweenTimeMillis时,就会使用validationQuery执行一次查询。

if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } ... for (int i = keepAliveCount - 1; i >= 0; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false; try { this.validateConnection(connection); validate = true; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr", error); } // skip } ... } 复制代码

如果本次validationQuery执行失败,则关闭该连接,并丢弃。

数据源收缩 timeBetweenEvictionRunsMillis & minEvictableIdleTimeMillis & maxEvictableIdleTimeMillis

在druid数据源初始化时,会创建一个定时运行的DestroyTask。

该任务的主要目的就是将已空闲时间满足关闭条件的连接关闭。

if (idleMillis >= minEvictableIdleTimeMillis) { if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue; } else if (idleMillis > maxEvictableIdleTimeMillis) { evictConnections[evictCount++] = connection; continue; } } ... if (evictCount > 0) { for (int i = 0; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this); } Arrays.fill(evictConnections, null); } 复制代码

从上面的代码中可以看出,对于要关闭的空闲连接选择逻辑如下:

if (checkTime && i < checkCount) : 对于空闲时间>minEvictableIdleTimeMillis的连接,仅会关闭前poolingCount - minIdle个,后面的连接不会受到影响。 (也就是只扫描前checkCount个,checkCount即为当前池中连接数超出minIdle的个数)。

而对于空闲时间>maxEvictableIdleTimeMillis的连接,会直接进行关闭。

最后,timeBetweenEvictionRunsMillis就是该定时收缩任务的运行间隔:

long period = timeBetweenEvictionRunsMillis; if (period


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3