您现在的位置是:首页 >技术教程 >Druid连接池源码分析网站首页技术教程
Druid连接池源码分析
项目监控:Druid Stat Indexhttp://120.26.192.168/druid/index.html
一、核心配置文件介绍
序号 | 字段 | 默认 | 内容 |
1 | maxWait | -1 | 获取连接等待超时的时间(单位ms) |
2 | inited | FALSE | DruidDataSource获取连接时初始化 |
3 | initialSize | 0 | 线程池初始化线程数 |
4 | minIdle | 0 | 最小连接池数量 |
5 | maxActive | 8 | 最大连接池数量 |
6 | removeAbandoned | FALSE | 链接使用超过时间限制是否回收 |
7 | removeAbandonedTimeout | 300s | 任务超时时间限制 |
8 | keepAlive | FALSE | 连接池中< minldle数量空闲时间超过minEvictableIdleTimeMillis,执行keepAlive操作 |
9 | validationQuery | null | 检测连接有效sql,如果为null testOnBorrow、testOnRetrun、textWhileIdel都不生效 |
10 | testOnBorrow | fasle | 申请连接时检查连接有效性,开启时不校验testWhileIdle |
11 | validationQueryTimeout | -1 | 检测连接是否有效 超时时间 |
12 | testWhileIdle | TRUE | 申请连接时如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测有效性 |
13 | timeBetweenEvictionRunsMillis | 60s | 1、testWhileIdle检测时间; 2、Destroy线程检测连接时间间隔,如果连接空闲时间大于MinEcictableIdelTimeMillis,会执行keepAlived操作 |
14 | minEvictableIdleTimeMillis | 连接保持空闲不被注销最小时间 | |
15 | removeAbandoned | fasle | 连接池归还连接时是否检查连接有效性 |
16 | removeAbandonedTimeoutMillis | 300s | 任务超时时间限制 |
17 | testOnReturn | FALSE | 往连接池归还连接时是否检查连接有效性 |
18 | filters | 扩展插件 |
type=com.alibaba.druid.pool.DruidDataSource
queryTimeout=15
#tcp/ip配置方式
maximum-pool-size=300
#配置初始化大小、最小、最大
initial-size=5
min-idle=5
max-active=200
#配置获取连接等待超时的时间
max-wait=10000
#配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒。检测时:1.如果连接空闲并且超过minIdle以外的连接,如果空闲时间超过minEvictableIdleTimeMillis设置的值则直接物理关闭。2.在minIdle以内的不处理。
time-between-eviction-runs-millis=60000
#配置一个连接在池中最小生存的时间,单位是毫秒。testWhileIdle为true时,如果连接空闲时间超过minEvictableIdleTimeMillis进行检查,否则不检查;false时,不检查
min-evictable-idle-time-millis=30000
#检验连接是否有效的查询语句
validation-query=SELECT 1
#设置从连接池获取连接时是否检查连接有效性,true时,每次都检查;false时,不检查
test-on-borrow=false
#设置往连接池归还连接时是否检查连接有效性,true时,每次都检查;false时,不检查
test-on-return=false
#设置从连接池获取连接时是否检查连接有效性,true时,如果连接空闲时间超过minEvictableIdleTimeMillis进行检查,否则不检查;false时,不检查
test-while-idle=true
#链接使用超过时间限制是否回收
remove-abandoned=true
#超过时间限制时间(单位秒),目前为5分钟,如果有业务处理时间超过5分钟,可以适当调整。
remove-abandoned-timeout=300#配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
filters=stat,wall,log4j
二、初始化
DruidDataSource初始化在加载druid和获取数据库连接 两种情况都会调用,当设置init=true,在createDataSource时会调用init()方法初始化,否则会在调用getConnection时再进行初始化
public void init() throws SQLException {
//1、判断是否已经初始化了,inited是volatile修饰的字段,保证只初始化一次
if (inited) {
return;
}
// bug fixed for dead lock, for issue #2980
//2、提前初始化DruidDriver,具体参考https://github.com/alibaba/druid/issues/2980
DruidDriver.getInstance();
//3、加锁 和 双重判断 避免并发操作
final ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
if (inited) {
return;
}
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
//4、获取datasSoureceId CAS
this.id = DruidDriver.createDataSourceId();
//5、如果id大于1,不止一个数据源,保留区段给不同数据源使用
if (this.id > 1) {
long delta = (this.id - 1) * 100000;
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
}
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
//6、解析以jdbc:wrap-jdbc:开头,druid自定义的一种url格式
initFromWrapDriverUrl();
}
for (Filter filter : filters) {
filter.init(this);
}
if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
//7、根据url前缀,确定数据库类型
this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
}
DbType dbType = DbType.of(this.dbTypeName);
if (dbType == DbType.mysql
|| dbType == DbType.mariadb
|| dbType == DbType.oceanbase
|| dbType == DbType.ads) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
}
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
if (this.driverClass != null) {
this.driverClass = driverClass.trim();
}
//8、配置参数
//8.1、SPI机制加载filter过滤器
initFromSPIServiceLoader();
//8.2、获取数据库连接Driver
resolveDriver();
//8.3、oracle和DB2 数据库类型 校验validationQuery心跳sql是否合法
initCheck();
//8.4、根据dbType初始化ExceptionSorter处理异常
initExceptionSorter();
//8.5、根据dbType初始化ValidConnectionChecker检测连接是否有效
initValidConnectionChecker();
//8.6、非阻塞记录检测参数的合法性
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbTypeName);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbTypeName);
}
} else {
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbTypeName, this.connectProperties);
}
dataSourceStat.setResetStatEnable(this.resetStatEnable);
//9、初始化对象池 连接对象、丢失对象、保活对象
connections = new DruidConnectionHolder[maxActive];
evictConnections = new DruidConnectionHolder[maxActive];
keepAliveConnections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
//10、创建连接
if (createScheduler != null && asyncInit) {
for (int i = 0; i < initialSize; ++i) {
//10.1、创建连接 之 异步
submitCreateTask(true);
}
} else if (!asyncInit) {
// init connections
//10.2、创建连接 之 同步:空闲连接对象数量<初始化连接池数量时,创建连接
while (poolingCount < initialSize) {
try {
//10.2.1、创建连接:读取配置(url 用户名 密码)-> createPhysicalConnection创建连接 -> 校验连接是否可用【validConnectionChecker】
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount++] = holder;
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000);
}
}
}
if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}
//11、启动监控
//11.1、启动异步监控配置数据线程
createAndLogThread();
//11.2、启动连接创建线程
createAndStartCreatorThread();
//11.3、启动连接检测线程
createAndStartDestroyThread();
//11.4、保证createAndStartCreatorThread和createAndStartDestroyThread执行完
initedLatch.await();
init = true;
initedTime = new Date();
//12、注册支持JMX监控MBean资源
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
//13、如果开启keepAlived
if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) {
for (int i = 0; i < minIdle; ++i) {
//13.1、异步创建连接,通过long数组实现创建任务队列,初始化8个长度,0代表任务为空,不为0代表具体任务id。long满后以1.5倍扩容,通过createScheduler,执行任务
submitCreateTask(true);
}
} else {
//13.2、同步创建连接,通过empty.signal()唤醒处于empty.await()状态的CreateConnectionThread,这个线程在需要创建连接时才运行,否则会一直等待
this.emptySignal();
}
}
} catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (Error e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} finally {
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg += ",";
msg += this.name;
}
msg += "} inited";
LOG.info(msg);
}
}
}
五、获取连接
1、sql查询入口
1、业务方法api
public List<Map<String, Object>> getListMapBySql(Sql sql) {
String sqlstr = this.noLockTreat(sql.toSqlString());
return (List)this.jdbcTemplate.query(new MySimplePreparedStatementCreator(sqlstr), new ArgumentPreparedStatementSetter(sql.parametersValue()), new RowMapperResultSetExtractor(new ColumnMapRowMapper()));
}
2、JdbcTemplate class类执行查询方法
public <T> T query(
PreparedStatementCreator psc, final PreparedStatementSetter pss, final ResultSetExtractor<T> rse)
throws DataAccessException {
--执行查询方法
return execute(psc, new PreparedStatementCallback<T>() {
@Override
public T doInPreparedStatement(PreparedStatement ps) throws SQLException {
ResultSet rs = null;
try {
if (pss != null) {
pss.setValues(ps);
}
rs = ps.executeQuery();
ResultSet rsToUse = rs;
if (nativeJdbcExtractor != null) {
rsToUse = nativeJdbcExtractor.getNativeResultSet(rs);
}
return rse.extractData(rsToUse);
}
finally {
JdbcUtils.closeResultSet(rs);
if (pss instanceof ParameterDisposer) {
((ParameterDisposer) pss).cleanupParameters();
}
}
}
});
}
2.1、JdbcTemplate 任务执行
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
throws DataAccessException {
//1、获取数据库连接
Connection con = DataSourceUtils.getConnection(getDataSource());
PreparedStatement ps = null;
try {
Connection conToUse = con;
if (this.nativeJdbcExtractor != null &&
this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) {
conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
}
ps = psc.createPreparedStatement(conToUse);
applyStatementSettings(ps);
PreparedStatement psToUse = ps;
if (this.nativeJdbcExtractor != null) {
psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps);
}
//2、执行任务
T result = action.doInPreparedStatement(psToUse);
handleWarnings(ps);
return result;
}
catch (SQLException ex) {
// Release Connection early, to avoid potential connection pool deadlock
// in the case when the exception translator hasn't been initialized yet.
if (psc instanceof ParameterDisposer) {
((ParameterDisposer) psc).cleanupParameters();
}
String sql = getSql(psc);
psc = null;
JdbcUtils.closeStatement(ps);
ps = null;
DataSourceUtils.releaseConnection(con, getDataSource());
con = null;
throw getExceptionTranslator().translate("PreparedStatementCallback", sql, ex);
}
finally {
if (psc instanceof ParameterDisposer) {
((ParameterDisposer) psc).cleanupParameters();
}
//5、归还线程
JdbcUtils.closeStatement(ps);
//4、释放线程
DataSourceUtils.releaseConnection(con, getDataSource());
}
}
3、DataSourceUtils工具类获取连接
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
}
}
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Connection con = dataSource.getConnection();
return con;
}
4、获取数据库连接
public interface DataSource extends CommonDataSource, Wrapper {
/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
*
* @return a connection to the data source
* @exception SQLException if a database access error occurs
* @throws java.sql.SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
*/
Connection getConnection() throws SQLException;
/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
*
* @param username the database user on whose behalf the connection is
* being made
* @param password the user's password
* @return a connection to the data source
* @exception SQLException if a database access error occurs
* @throws java.sql.SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
* @since 1.4
*/
Connection getConnection(String username, String password)
throws SQLException;
}
2、获取连接
DruidDataSource继承DataSource并重写getConnection方法
1、DruidDataSource类重新数据库连接方法
@Override
public DruidPooledConnection getConnection() throws SQLException {
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
//1.1、初始化
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
//1.2、配置过滤器:stat,wall,log4j
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
//1.3、获取连接
return getConnectionDirect(maxWaitMillis);
}
}
2、FilterChainImpl class过滤器 定义数据源以及连接池信息
@Override
public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
if (this.pos < filterSize) {
//nextFilter()遍历责任链获取下一个实际的过滤器获取连接
DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis);
return conn;
}
return dataSource.getConnectionDirect(maxWaitMillis);
}
3、例如LogFilter
@Override
public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource,
long maxWaitMillis) throws SQLException {
DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis);
ConnectionProxy connection = (ConnectionProxy) conn.getConnectionHolder().getConnection();
if (connectionConnectAfterLogEnable && isConnectionLogEnabled()) {
connectionLog("{conn-" + connection.getId() + "} pool-connect");
}
return conn;
}
4、获取连接
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
//1、获取线程池可用连接 【5、循环获取线程池可用连接】
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
//2、设置从连接池获取连接时是否检查连接有效性,true时,每次都检查;false时,不检查
if (testOnBorrow) {
//2.1、检查连接有效性
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
//2.2、关闭连接、holder活跃属性设置false、活跃数量-1、holder抛弃属性设置true、触发emptySignal进行创建
discardConnection(poolableConnection.holder);
continue;
}
} else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder); // 传入null,避免重复关闭
continue;
}
//3、设置空闲时间检测,申请连接时如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测有效性
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
long lastExecTimeMillis = holder.lastExecTimeMillis;
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {
lastActiveTimeMillis = lastExecTimeMillis;
}
if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0 // unexcepted branch
) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
//4、线程使用后 回收 重置时间配置:异步线程removeAbandoned()校验任务时间是否大于removeAbandonedTimeoutMillis则回收
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
5、循环获取线程池可用连接
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
}
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
for (boolean createDirect = false;;) {
//1、是否直接创建连接 createDirect默认为false
if (createDirect) {
createStartNanosUpdater.set(this, System.nanoTime());
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
PhysicalConnectionInfo pyConnInfo = DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
if (LOG.isDebugEnabled()) {
LOG.debug("conn-direct_create ");
}
boolean discard = false;
lock.lock();
try {
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
//2、加锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
connectErrorCountUpdater.incrementAndGet(this);
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount ")
.append(activeCount)
.append(", onFatalErrorMaxActive ")
.append(onFatalErrorMaxActive);
if (lastFatalErrorTimeMillis > 0) {
errorMsg.append(", time '")
.append(StringUtils.formatDateTime19(
lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
}
if (lastFatalErrorSql != null) {
errorMsg.append(", sql
")
.append(lastFatalErrorSql);
}
throw new SQLException(
errorMsg.toString(), lastFatalError);
}
connectCount++;
//3、如果没有空闲线程,且线程为达到设置最大线程数,设置直接创建线程 createDirect = true,重新循环
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
//4、创建连接
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}
break;
}
if (holder == null) {
long waitNanos = waitNanosLocal.get();
StringBuilder buf = new StringBuilder(128);
buf.append("wait millis ")//
.append(waitNanos / (1000 * 1000))//
.append(", active ").append(activeCount)//
.append(", maxActive ").append(maxActive)//
.append(", creating ").append(creatingCount)//
;
if (creatingCount > 0 && createStartNanos > 0) {
long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
if (createElapseMillis > 0) {
buf.append(", createElapseMillis ").append(createElapseMillis);
}
}
if (createErrorCount > 0) {
buf.append(", createErrorCount ").append(createErrorCount);
}
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) {
buf.append('
');
} else {
buf.append(", ");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ").append(sql.getRunningCount());
buf.append(" : ");
buf.append(sql.getSql());
}
String errorMessage = buf.toString();
if (this.createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
3、使用连接
1.3、DataSourceUtils工具类获取连接包装程ConnectionHolder并使用
1、JdbcTemple 执行查询
public <T> T query(
PreparedStatementCreator psc, final PreparedStatementSetter pss, final ResultSetExtractor<T> rse)
throws DataAccessException {
Assert.notNull(rse, "ResultSetExtractor must not be null");
logger.debug("Executing prepared SQL query");
return execute(psc, new PreparedStatementCallback<T>() {
@Override
public T doInPreparedStatement(PreparedStatement ps) throws SQLException {
ResultSet rs = null;
try {
if (pss != null) {
pss.setValues(ps);
}
rs = ps.executeQuery();
ResultSet rsToUse = rs;
if (nativeJdbcExtractor != null) {
rsToUse = nativeJdbcExtractor.getNativeResultSet(rs);
}
return rse.extractData(rsToUse);
}
finally {
JdbcUtils.closeResultSet(rs);
if (pss instanceof ParameterDisposer) {
((ParameterDisposer) pss).cleanupParameters();
}
}
}
});
}
2、DruidPooledPreparedStatement执行sql任务
@Override
public ResultSet executeQuery() throws SQLException {
checkOpen();
incrementExecuteQueryCount();
transactionRecord(sql);
oracleSetRowPrefetch();
conn.beforeExecute();
try {
ResultSet rs = stmt.executeQuery();
if (rs == null) {
return null;
}
DruidPooledResultSet poolableResultSet = new DruidPooledResultSet(this, rs);
addResultSetTrace(poolableResultSet);
return poolableResultSet;
} catch (Throwable t) {
errorCheck(t);
throw checkException(t);
} finally {
conn.afterExecute();
}
}
4、归还连接
4.1、监控归还
1、JdbcTemplate类execute方法中调用JdbcUtils.closeStatement(ps) ,通过PreparedStatement关闭(PreparedStatement父类close方法关闭)
2、Statement类close()方法
3、DruidPooledPreparedStatement调用close()
4、DruidPooledConnection类调用closePoolableStatement()
1、DruidPooledConnection类调用closePoolableStatement()
public void closePoolableStatement(DruidPooledPreparedStatement stmt) throws SQLException {
PreparedStatement rawStatement = stmt.getRawPreparedStatement();
final DruidConnectionHolder holder = this.holder;
if (holder == null) {
return;
}
if (stmt.isPooled()) {
try {
rawStatement.clearParameters();
} catch (SQLException ex) {
this.handleException(ex, null);
if (rawStatement.getConnection().isClosed()) {
return;
}
LOG.error("clear parameter error", ex);
}
}
PreparedStatementHolder stmtHolder = stmt.getPreparedStatementHolder();
stmtHolder.decrementInUseCount();
if (stmt.isPooled() && holder.isPoolPreparedStatements() && stmt.exceptionCount == 0) {
holder.getStatementPool().put(stmtHolder);
stmt.clearResultSet();
holder.removeTrace(stmt);
stmtHolder.setFetchRowPeak(stmt.getFetchRowPeak());
stmt.setClosed(true); // soft set close
} else if (stmt.isPooled() && holder.isPoolPreparedStatements()) {
// the PreparedStatement threw an exception
stmt.clearResultSet();
holder.removeTrace(stmt);
holder.getStatementPool()
.remove(stmtHolder);
} else {
try {
//Connection behind the statement may be in invalid state, which will throw a SQLException.
//In this case, the exception is desired to be properly handled to remove the unusable connection from the pool.
//1、DruidPooledStatement类close方法进行关闭
stmt.closeInternal();
} catch (SQLException ex) {
this.handleException(ex, null);
throw ex;
} finally {
holder.getDataSource().incrementClosedPreparedStatementCount();
}
}
}
2、DruidPooledStatement类close关闭连接
@Override
public void close() throws SQLException {
if (this.closed) {
return;
}
clearResultSet();
if (stmt != null) {
stmt.close();
}
this.closed = true;
DruidConnectionHolder connHolder = conn.getConnectionHolder();
if (connHolder != null) {
connHolder.removeTrace(this);
}
}
3、监控回收 StatementProxyImpl类
@Override
public void close() throws SQLException {
if (this.statement == null) {
return;
}
FilterChainImpl chain = createChain();
chain.statement_close(this);
recycleFilterChain(chain);
}
4.2、连接归还
1、JdbcTemplate类execute方法中调用DataSourceUtils.releaseConnection(con, getDataSource())
2、DataSourceUtils类doReleaseConnection(con, dataSource);
3、DataSourceUtils类doCloseConnection(con, dataSource);
4、DataSourceUtils类con.close();(DruidPooledConnection类实现close方法)
5、DruidPooledConnection类syncClose()
6、DruidPooledConnection类recycle()
7、DruidPooledConnection类dataSource.recycle()
8、DruidDataSource类recycle()
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;
if (holder == null) {
LOG.warn("connectionHolder is null");
return;
}
if (logDifferentThread //
&& (!isAsyncCloseConnectionEnable()) //
&& pooledConnection.ownerThread != Thread.currentThread()//
) {
LOG.warn("get/close not same thread");
}
final Connection physicalConnection = holder.conn;
if (pooledConnection.traceEnable) {
Object oldInfo = null;
activeConnectionLock.lock();
try {
if (pooledConnection.traceEnable) {
oldInfo = activeConnections.remove(pooledConnection);
pooledConnection.traceEnable = false;
}
} finally {
activeConnectionLock.unlock();
}
if (oldInfo == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size());
}
}
}
final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;
try {
// check need to rollback?
//1、判断连接是否需要回滚
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}
// reset holder, restore default settings, clear warnings
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
//2、holder参数重置
try {
holder.reset();
} finally {
lock.unlock();
}
} else {
holder.reset();
}
if (holder.discard) {
return;
}
if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {
discardConnection(holder);
return;
}
if (physicalConnection.isClosed()) {
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
//4、往连接池归还连接时是否检查连接有效性
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
if (!validate) {
JdbcUtils.close(physicalConnection);
destroyCountUpdater.incrementAndGet(this);
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
if (holder.initSchema != null) {
holder.conn.setSchema(holder.initSchema);
holder.initSchema = null;
}
if (!enable) {
discardConnection(holder);
return;
}
boolean result;
final long currentTimeMillis = System.currentTimeMillis();
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
discardConnection(holder);
return;
}
}
//5、连接可用 将holder放入连接池尾部
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
//6、连接不可用 回收连接
if (!result) {
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
holder.clearStatementCache();
if (!holder.discard) {
discardConnection(holder);
holder.discard = true;
}
LOG.error("recyle error", e);
recycleErrorCountUpdater.incrementAndGet(this);
}
}
5、回收线程
1、创建异步销毁任务 默认1s
protected void createAndStartDestroyThread() {
destroyTask = new DestroyTask();
if (destroyScheduler != null) {
long period = timeBetweenEvictionRunsMillis;
if (period <= 0) {
period = 1000;
}
destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
TimeUnit.MILLISECONDS);
initedLatch.countDown();
return;
}
String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
destroyConnectionThread = new DestroyConnectionThread(threadName);
destroyConnectionThread.start();
}
2、销毁任务执行器
public class DestroyTask implements Runnable {
public DestroyTask() {
}
@Override
public void run() {
//2.1、清理连接池内连接
shrink(true, keepAlive);
if (isRemoveAbandoned()) {
//2.2、清理不可用连接
removeAbandoned();
}
}
}