基础支持层——DataSource

MyBatis基础支持层位于 Mybatis 整体架构的最底层,支撑着 Mybatis 的核心处理层,是整个框架的基石。基础支持层中封装了多个较为通用的、独立的模块,不仅仅为 Mybatis 提供基础支撑,也可以在合适的场景中直接复用。

整体架构

这篇文章介绍MyBatis的DataSource模块

在数 据持久层 中,数 据源是一个 非常重要的组 件,其性能直接关 系到整个 数 据持久层 的性能。

在实 践 中比较 常见 的第三方数 据源组 件有ApacheCommonDBCPC3P0Proxool等,MyBatis不仅 可以集成第三方数 据源组 件,还 提供了自己的数 据源实 现 。

常见 的数 据源组 件都实 现 了 javax.sql.DataSource接口,MyBatis自身实 现 的数 据源实 现 也 不 例 外 。

MyBatis 提 供 了 两 个 javax.sql.DataSource 接 口 实 现 , 分 别 是 PooledDataSourceUnpooledDataSource

Mybatis使 用 不 同 的 DataSourceFactory接 口 实 现 创 建 不 同 类 型 的 DataSource,如图所示,这 是工厂 方法模式的一个 典型应 用。

image-20200608220639013

工厂方法模式

在工厂 方法模式中,定义 了一个 用于创 建对 象的工厂 接口,并 根据工厂 接口的具体 实 现 类 决定具体实例化哪一个具体产品类。首先来看工厂方法模式的UML图,从整体上了解该模式 的结 构 。

image-20200608221034019

工厂方法有四个角色构成:

  • 工厂接口(Factory)

    工厂 接口是工厂 方法模式的核心接口,调 用者会 直接与 工厂 接 口交互用于获取具体的产品实现类

  • 具体工厂类(ConcreteFactory)

    具体 工厂 类 是工厂 接口的实 现 类 ,用于实 例化产 品 对象,不同的具体工厂类会根据需求实例化不同的产品实现类。

  • 产品接口(Product)

    品接口用于定义 产 品类 的功能,具体 工厂 类 产 生的所有产 品对象都必须实现该接口。调用者一般会面向产品接口进行编程,所以产品接口会与调用者直接交互,也是调 用者最为 关 心的接口。

  • 具体 产 品类 (ConcreteProduct)

    现 产 品接口的实 现 类 ,具体 产 品类 中定义 了具体的业务逻辑

如果需要产 生新的产 品,例如对 于MyBatis的数 据源模块 来 说 ,就是添加新的第三方数 据 源组 件,只需要添加对 应 的工厂 实 现 类 ,新数 据源就可以被MyBatis使用,而不必修改己有的 代码 。显 然,工厂 方法模式符合“开 放-封闭 ”原则 。除此之外,工厂 方法会 向调 用者隐 藏具体 产 品类 的实 例化细 节 ,调 用者只需要了解工厂 接口和产 品接口,面向这 两 个 接口编 程即 可。

工厂 方法模式也是存在缺点的。在增加新产 品实 现 类 时 ,还 要提供一个 与 之对 应 的工厂 实 现 类 ,所以实 际 新增的类 是成对 出现 的,这 增加了系统 的复 杂 度。另 外,工厂 方法模式引入了 工厂 接口和产 品接口这 一层 抽象,调 用者面向该 抽象层 编 程,增加了程序的抽象性和理解难 度。

DataSourceFactory

在数 据源模块 中,DataSourceFactory接口扮演工厂 接口的角色。

UnpooledDataSourceFactoryPooledDataSourceFactory则 扮演着具体 工厂 类 的角色。

我们 从 DataSourceFactory接口开 始分 析,其定义 如下:

public interface DataSourceFactory {

// 设置DataSource的相关属性,一般紧跟在初始化完成之后
void setProperties(Properties props);

// 获取DataSource对象
DataSource getDataSource();

}

UnpooledDataSourceFactory的 构 造 函 数 中 会 直 接 创 建 UnpooledDataSource对 象 ,并 初始 化 UnpooledDataSourceFactory.dataSource 字 段 。UnpooledDataSourceFactory.setProperties()方法会 完成对 UnpooledDataSource对 象的配置,代码 如下:

public void setProperties(Properties properties) {
Properties driverProperties = new Properties();
// 创建DataSource对应的MetaObject
MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
// 遍历properties集合,该集合中配置了数据源需要的信息
for (Object key : properties.keySet()) {
String propertyName = (String) key;
// 以"driver."开头的配置项是对DateSource的配置,记录到driverProperties中保存
if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
String value = properties.getProperty(propertyName);
driverProperties.setProperty(
propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
} else if (metaDataSource.hasSetter(propertyName)) {
String value = (String) properties.get(propertyName);
Object convertedValue = convertValue(metaDataSource, propertyName, value);
metaDataSource.setValue(propertyName, convertedValue);
} else {
throw new DataSourceException("...");
}
}
if (driverProperties.size() > 0) {
metaDataSource.setValue("driverProperties", driverProperties);
}
}

UnpooledDataSourceFactory.getDataSource()方法实 现 比较 简 单 ,它 直接返回 dataSource字段 记 录 的 UnpooledDataSource 对 象 。

PooledDataSourceFactory 继 承 了 UnpooledDataSourceFactory, 但 并 没 有 覆 盖 setProperties() 方法和getDataSource()方法。两 者唯一的区 别 是PooledDataSourceFactory的构 造函数 会 将 其 dataSource 字 段 初 始 化 为 PooledDataSource 对 象 。

JndiDataSourceFactory是依赖 JNDI服务 从 容器中获 取用户 配置的DataSource,其逻 辑 并 不 复 杂 。

UnpooledDataSource

javax.sql.DataSource接口在数 据源模块 中扮演了产 品接口的角色,MyBatis提供了两 个 DataSource接 口 的 实 现 类 ,分 别 是 UnpooledDataSourcePooledDataSource,它 们 扮 演 着 具 体 产 品类 的角色。

UnpooledDataSource 实 现 了 javax.sql.DataSource 接 口 中 定 义 的 getConnection()方 法 及 其 重 载方法,用于获 取数 据库 连 接。

每次通过 UnpooledDataSource.getConnection()方法获 取数 据库 连 接 时 都会 创 建一个 新连 接。

UnpooledDataSource中的字段如下,每个 字段都有对 应 的getter/setter 方法:

public class UnpooledDataSource implements DataSource {

private ClassLoader driverClassLoader;
private Properties driverProperties;
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

private String driver;
private String url;
private String username;
private String password;

private Boolean autoCommit;
private Integer defaultTransactionIsolationLevel;

static {
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
// ...
}

Pooled DataSource

了解JDBC编 程的读 者知道,数 据库 连 接的创 建过 程是非常耗时 的,数 据库 能够 建立的连 接数 也非常有限,所以在绝 大多数 系统 中,数 据库 连 接是非常珍贵 的资 源,使用数 据库 连 接池就显得尤为必要。

使用数据库连接池会带来很多好处,例如,可以实现数据库连接的重用、提高响 应 速度、防止数 据库 连 接过 多造成数 据库 假死、避免数 据库 连 接泄露等。

数据库连接池在初始化时,一般会创建一定数量的数据库连接并添加到连接池中备用。

当 程序需要使用数 据库 连 接时 ,从 池中请 求连 接;当 程序不再使用该 连 接时 ,会 将 其返回到池中 缓 存,等待下次使用,而不是直接关 闭 。

当 然,数 据库 连 接池会 控制连 接总 数 的上限以及空闲 连 接数 的上限,如果连 接池创 建的总 连 接数 己达 到上限,且都已被占用,则 后续 请 求连 接的线 程会 进 入阻塞队 列等待,直到有线 程释 放出可用的连 接。

如果连 接池中空闲 连 接数 较 多,达 到 其上限,则 后续 返回的空闲 连 接不会 放入池中,而是直接关 闭 ,这 样 可以减 少系统 维 护 多余数 据库连接的开销。

  • 如果将总连接数的上限设置得过大,可能因连接数过多而导致数据库僵死,系统整体性能 下降;
  • 如果总连接数上限过小,则无法完全发挥数据库的性能,浪费数据库资源。如果将空闲 连接的上限设置得过大,则会浪费系统资源来维护这些空闲连接;
  • 如果空闲连接上限过小,当 出现 瞬间 的峰值 请 求时 ,系统 的快速响 应 能力就比较 弱。

所以在设 置数 据库 连 接池的这 两 个 值 时,需要进行性能测试、权衡以及一些经验。

PooledDataSource实 现 了简 易数 据库 连 接池的功能,它 依赖 的组 件如图 所示,其中需 要注意的是,PooledDataSource创 建新数 据库 连 接的功能是依赖 其中封装 的UnpooledDataSource 对象实现的。

image-20200609115725188

PooledConnection

PooledDataSource并 不会 直接管理java.sql.Connection对 象,而是管理 PooledConnection对 象。

PooledConnection中封装 了真 正的数 据库 连 接对 象(java.sql.Connection) 以及其代理对 象,这 里的代理对 象是通过 JDK动 态 代理产 生的。PooledConnection继 承了 InvocationHandler 接口。

核心字段:

// 记录当前PooledConnection对 象所在的 PooledDataSource对 象。
// 该 PooledConnection是从该 PooledDataSource中获 取的;
// 当 调 用close() 方法时 会 将 PooledConnection放回该PooledDataSource 中
private final PooledDataSource dataSource;
// 真正的数据库连接
private final Connection realConnection;
// 数据库连接代理对象
private final Connection proxyConnection;
// 从连接池中取出该连接的时间戳
private long checkoutTimestamp;
// 创建该连接的时间戳
private long createdTimestamp;
// 最后一次使用的时间戳
private long lastUsedTimestamp;
// 由数据库URL、用户名和密码计算出来的hash值,可用于标识该连接所在的连接池
private int connectionTypeCode;
// 检测当前PooledConnection是否有效,主要是为了防止程序通过close()方法将连接还给连接池之后
// 依然通过该连接操作数据库
private boolean valid;

PooledConnection.invoke()方法的实 现 ,该 方法是proxyConnection这 个 连 接代理对 象的真 正代理 逻 辑 ,它 会 对 close()方法的调 用进 行代理,并 且在调 用真 正数 据库 连 接的方法之前进 行检 测 , 代码 如下:

public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
String methodName = method.getName();
// 如果调用close()方法,则将其重新放入连接池,而不是真正关闭数据库连接
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
dataSource.pushConnection(this);
return null;
} else {
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
// 通过valid字段检测连接是否有效
checkConnection();
}
// 调用真正数据库连接对象对应的方法
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}

PoolState

PoolState是 用 于 管 理 PooledConnection对 象 状 态 的 组 件 ,它 通 过 两 个 ArrayList <PooledConnection>集合分别 管理空闲 状 态 的连 接和活跃 状 态 的连 接,定义 如下:

// 空闲的PooledConnection对象集合
protected final List<PooledConnection> idleConnections =
new ArrayList<PooledConnection>();
// 活跃的PooledConnection集合
protected final List<PooledConnection> activeConnections =
new ArrayList<PooledConnection>();
// 请求数据库连接的次数
protected long requestCount = 0;
// 获取连接的累积时间
protected long accumulatedRequestTime = 0;
// checkoutTime表示应用从连接池中取出连接,到归还的这段时长
// accumulatedCheckoutTime记录了所有连接累积的checkoutTime时长
protected long accumulatedCheckoutTime = 0;
// 当连接长时间未归还给连接池的时候,会被认该连接超时
// claimedOverdueConnectionCount记录了超时连接个数
protected long claimedOverdueConnectionCount = 0;
// 累积超时时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
// 累积等待时间
protected long accumulatedWaitTime = 0;
// 累积等待次数
protected long hadToWaitCount = 0;
// 无效的连接数
protected long badConnectionCount = 0;

PooledDataSource

PooledDataSource中 管 理 的 真 正 的 数 据 库 连 接 对 象 是 由 PooledDataSource中封装 的UnpooledDataSource对 象 创 建 的 ,并 由 PoolState管 理 所 有 连 接 的 状 态 。

PooledDataSource中核心字段的含义 和功能如下:

// 通过 PoolState管理连接池的状态并记录统计信息
private final PoolState state = new PoolState(this);

// 记录UnpooledDataSource对象,用于生成真实的数据库连接对象,构造函数中会初始化该字段
private final UnpooledDataSource dataSource;

// 最大活跃连接数
protected int poolMaximumActiveConnections = 10;
// 最大空闲连接数
protected int poolMaximumIdleConnections = 5;
// 最大checkout时长
protected int poolMaximumCheckoutTime = 20000;
// 最大等待时间
protected int poolTimeToWait = 20000;
// 最大无效连接数量
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 在检测一个数据库连接是否可用的时候,会给数据库发送一个测试SQL语句
protected String poolPingQuery = "NO PING QUERY SET";

protected boolean poolPingEnabled;

// 当连接超过 poolPingConnectionsNotUsedFor毫秒未使用时 ,会发送一次测试SQL语句,检测连接是否正常
protected int poolPingConnectionsNotUsedFor;

// 根据数据库的URL、用户名和密码生成的一个hash值,该哈希值用于标志着当前的连接池,在构造函数中初始化
private int expectedConnectionTypeCode;

PooledDataSource.getConnection()方 法 首 先 会 调 用 PooledDataSource.popConnection()方 法 获 取 PooledConnection 对 象,然后通过 PooledConnection.getProxyConnection()方法获 取数 据库 连 接的代理对 象。popConnection()方法是PooledDataSource的核心逻 辑 之一,其具体 逻 辑 如图。

image-20200609150810073

PooledDataSource.popConnection()方法的具体实现:

private PooledConnection popConnection(String username, String password) 
throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;

while (conn == null) {
synchronized (state) {
if (!state.idleConnections.isEmpty()) {
// Pool has available connection
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
log.debug("...");
}
} else {
// Pool does not have available connection
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// Can create new connection
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("...");
}
} else {
// Cannot create new connection
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// Can claim overdue connection
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
state.activeConnections.remove(oldestActiveConnection);
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
log.debug("Bad connection. Could not roll back");
}
}
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("....");
}
} else {
// Must wait
try {
if (!countedWait) {
state.hadToWaitCount++;
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("...");
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait);
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
// ping to server and check the connection is valid or not
if (conn.isValid()) {
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("...");
}
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
if (log.isDebugEnabled()) {
log.debug("...");
}
throw new SQLException("...");
}
}
}
}

}

if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("...");
}
throw new SQLException("...");
}

return conn;
}

通 过 前 面 对 PooledConnection.invoke()方法的分析我们 知道,当 调 用连 接的代理对 象的 close()方 法 时 ,并未关闭真正的数据连接 ,而是调用PooledDataSource.pushConnection()方法将 PooledConnection对 象归 还 给 连 接池,供之后重用。

PooledDataSource.pushConnection()方法也是 PooledDataSource的核心逻 辑 之一,其逻 辑 如图

image-20200609151224380

PooledDataSource.pushConnection()代码如下:

protected void pushConnection(PooledConnection conn) throws SQLException {

synchronized (state) {
state.activeConnections.remove(conn);
if (conn.isValid()) {
if (state.idleConnections.size() < poolMaximumIdleConnections
&& conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
// 回滚未提交的事务
conn.getRealConnection().rollback();
}
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
conn.invalidate();
if (log.isDebugEnabled()) {
log.debug("...");
}
state.notifyAll();
} else { // 空闲连接数已达到上限或PooledConnection对象并不属于该连接池
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.getRealConnection().close();
if (log.isDebugEnabled()) {
log.debug("...");
}
conn.invalidate();
}
} else {
if (log.isDebugEnabled()) {
log.debug("...");
}
// 统计无效PooledConnection对象个数
state.badConnectionCount++;
}
}
}

这里需要注意的是,PooledDataSource.pushConnection()方法和popConnection()方法中都调 用了 PooledConnection.isValid()方 法 来 检 测 PooledConnection的 有 效 性 , 该 方 法 除 了 检 测 PooledConnection.valid 字段的值 ,还 会 调 用 PooledDataSource.pingConnection()方法尝 试 让 数 据 库 执 行podPingQuery字段中记 录 的测 试 SQL语 句,从 而检 测 真 正的数 据库 连 接对 象是否依然 可以正常使用。

public boolean isValid() {
return valid && realConnection != null && dataSource.pingConnection(this);
}

protected boolean pingConnection(PooledConnection conn) {
boolean result = true;

try {
result = !conn.getRealConnection().isClosed();
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("...");
}
result = false;
}

if (result) {
if (poolPingEnabled) {
// 检测poolPingEnabled设置,是否运行执行测试SQL语 句
// 长时间(超过 poolPingConnectionsNotUsedFor指定的时长)未使用的连接,才需要ping
// 操作来检测数据库连接是否正常
if (poolPingConnectionsNotUsedFor >= 0
&& conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
if (log.isDebugEnabled()) {
log.debug("...");
}
Connection realConn = conn.getRealConnection();
Statement statement = realConn.createStatement();
ResultSet rs = statement.executeQuery(poolPingQuery);
rs.close();
statement.close();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
if (log.isDebugEnabled()) {
log.debug("...");
}
} catch (Exception e) {
log.warn("...");
try {
conn.getRealConnection().close();
} catch (Exception e2) {
//ignore
}
result = false;
if (log.isDebugEnabled()) {
log.debug("...");
}
}
}
}
}
return result;
}

最后需要注意的是PooledDataSource.forceCloseAll()方法,当 修改PooledDataSource的字段 时 ,例如数 据库 URL用户名密码autoCommit配置等,都会 调 用forceCloseAll()方法将 所 有数 据库 连 接关 闭 ,同时 也会 将 所有相应 的PooledConnectiori对 象都设 置为 无效,清 空 activeConnections 集 合 和 idleConnections 集 合 。

应用系统之后通过PooledDataSource.getConnection()获取连接时,会按照新的配置重新配置新的数据库连接以及相应的PooledConnection对象。

public void forceCloseAll() {
synchronized (state) {
expectedConnectionTypeCode = assembleConnectionTypeCode(
dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.activeConnections.remove(i - 1);
conn.invalidate();

Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();

Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
}
if (log.isDebugEnabled()) {
log.debug("PooledDataSource forcefully closed/removed all connections.");
}
}

参考

  • 《MyBatis技术内幕》

  • 部分图片来源——《MyBatis技术内幕》