基础支持层——DataSource

2019-10-28 书籍 《MyBatis技术内幕》

MyBatis基础支持层位于 Mybatis 整体架构的最底层,支撑着 Mybatis 的核心处理层,是整个框架的基石。基础支持层中封装了多个较为通用的、独立的模块,不仅仅为 Mybatis 提供基础支撑,也可以在合适的场景中直接复用。

整体架构

这篇文章介绍MyBatis的DataSource模块

在数 据持久层 中,数 据源是一个 非常重要的组 件,其性能直接关 系到整个 数 据持久层 的性能。

在实 践 中比较 常见 的第三方数 据源组 件有ApacheCommonDBCPC3P0Proxool等,MyBatis不仅 可以集成第三方数 据源组 件,还 提供了自己的数 据源实 现 。

常见 的数 据源组 件都实 现 了 javax.sql.DataSource接口,MyBatis自身实 现 的数 据源实 现 也 不 例 外 。

MyBatis 提 供 了 两 个 javax.sql.DataSource 接 口 实 现 , 分 别 是 PooledDataSourceUnpooledDataSource

Mybatis使 用 不 同 的 DataSourceFactory接 口 实 现 创 建 不 同 类 型 的 DataSource,如图所示,这 是工厂 方法模式的一个 典型应 用。

image-20200608220639013

工厂方法模式

在工厂 方法模式中,定义 了一个 用于创 建对 象的工厂 接口,并 根据工厂 接口的具体 实 现 类 决定具体实例化哪一个具体产品类。首先来看工厂方法模式的UML图,从整体上了解该模式 的结 构 。

image-20200608221034019

工厂方法有四个角色构成:

  • 工厂接口(Factory)

    工厂 接口是工厂 方法模式的核心接口,调 用者会 直接与 工厂 接 口交互用于获取具体的产品实现类

  • 具体工厂类(ConcreteFactory)

    具体 工厂 类 是工厂 接口的实 现 类 ,用于实 例化产 品 对象,不同的具体工厂类会根据需求实例化不同的产品实现类。

  • 产品接口(Product)

    品接口用于定义 产 品类 的功能,具体 工厂 类 产 生的所有产 品对象都必须实现该接口。调用者一般会面向产品接口进行编程,所以产品接口会与调用者直接交互,也是调 用者最为 关 心的接口。

  • 具体 产 品类 (ConcreteProduct)

    现 产 品接口的实 现 类 ,具体 产 品类 中定义 了具体的业务逻辑

如果需要产 生新的产 品,例如对 于MyBatis的数 据源模块 来 说 ,就是添加新的第三方数 据 源组 件,只需要添加对 应 的工厂 实 现 类 ,新数 据源就可以被MyBatis使用,而不必修改己有的 代码 。显 然,工厂 方法模式符合“开 放-封闭 ”原则 。除此之外,工厂 方法会 向调 用者隐 藏具体 产 品类 的实 例化细 节 ,调 用者只需要了解工厂 接口和产 品接口,面向这 两 个 接口编 程即 可。

工厂 方法模式也是存在缺点的。在增加新产 品实 现 类 时 ,还 要提供一个 与 之对 应 的工厂 实 现 类 ,所以实 际 新增的类 是成对 出现 的,这 增加了系统 的复 杂 度。另 外,工厂 方法模式引入了 工厂 接口和产 品接口这 一层 抽象,调 用者面向该 抽象层 编 程,增加了程序的抽象性和理解难 度。

DataSourceFactory

在数 据源模块 中,DataSourceFactory接口扮演工厂 接口的角色。

UnpooledDataSourceFactoryPooledDataSourceFactory则 扮演着具体 工厂 类 的角色。

我们 从 DataSourceFactory接口开 始分 析,其定义 如下:

public interface DataSourceFactory {

  // 设置DataSource的相关属性,一般紧跟在初始化完成之后
  void setProperties(Properties props);

  // 获取DataSource对象
  DataSource getDataSource();

}

UnpooledDataSourceFactory的 构 造 函 数 中 会 直 接 创 建 UnpooledDataSource对 象 ,并 初始 化 UnpooledDataSourceFactory.dataSource 字 段 。UnpooledDataSourceFactory.setProperties()方法会 完成对 UnpooledDataSource对 象的配置,代码 如下:

public void setProperties(Properties properties) {
  Properties driverProperties = new Properties();
  // 创建DataSource对应的MetaObject
  MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
  // 遍历properties集合,该集合中配置了数据源需要的信息
  for (Object key : properties.keySet()) {
    String propertyName = (String) key;
    // 以"driver."开头的配置项是对DateSource的配置,记录到driverProperties中保存
    if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
      String value = properties.getProperty(propertyName);
      driverProperties.setProperty(
        propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
    } else if (metaDataSource.hasSetter(propertyName)) {
      String value = (String) properties.get(propertyName);
      Object convertedValue = convertValue(metaDataSource, propertyName, value);
      metaDataSource.setValue(propertyName, convertedValue);
    } else {
      throw new DataSourceException("...");
    }
  }
  if (driverProperties.size() > 0) {
    metaDataSource.setValue("driverProperties", driverProperties);
  }
}

UnpooledDataSourceFactory.getDataSource()方法实 现 比较 简 单 ,它 直接返回 dataSource字段 记 录 的 UnpooledDataSource 对 象 。

PooledDataSourceFactory 继 承 了 UnpooledDataSourceFactory, 但 并 没 有 覆 盖 setProperties() 方法和getDataSource()方法。两 者唯一的区 别 是PooledDataSourceFactory的构 造函数 会 将 其 dataSource 字 段 初 始 化 为 PooledDataSource 对 象 。

JndiDataSourceFactory是依赖 JNDI服务 从 容器中获 取用户 配置的DataSource,其逻 辑 并 不 复 杂 。

UnpooledDataSource

javax.sql.DataSource接口在数 据源模块 中扮演了产 品接口的角色,MyBatis提供了两 个 DataSource接 口 的 实 现 类 ,分 别 是 UnpooledDataSourcePooledDataSource,它 们 扮 演 着 具 体 产 品类 的角色。

UnpooledDataSource 实 现 了 javax.sql.DataSource 接 口 中 定 义 的 getConnection()方 法 及 其 重 载方法,用于获 取数 据库 连 接。

每次通过 UnpooledDataSource.getConnection()方法获 取数 据库 连 接 时 都会 创 建一个 新连 接。

UnpooledDataSource中的字段如下,每个 字段都有对 应 的getter/setter 方法:

public class UnpooledDataSource implements DataSource {

  private ClassLoader driverClassLoader;
  private Properties driverProperties;
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

  private String driver;
  private String url;
  private String username;
  private String password;

  private Boolean autoCommit;
  private Integer defaultTransactionIsolationLevel;

  static {
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }
  // ...
}

Pooled DataSource

了解JDBC编 程的读 者知道,数 据库 连 接的创 建过 程是非常耗时 的,数 据库 能够 建立的连 接数 也非常有限,所以在绝 大多数 系统 中,数 据库 连 接是非常珍贵 的资 源,使用数 据库 连 接池就显得尤为必要。

使用数据库连接池会带来很多好处,例如,可以实现数据库连接的重用、提高响 应 速度、防止数 据库 连 接过 多造成数 据库 假死、避免数 据库 连 接泄露等。

数据库连接池在初始化时,一般会创建一定数量的数据库连接并添加到连接池中备用。

当 程序需要使用数 据库 连 接时 ,从 池中请 求连 接;当 程序不再使用该 连 接时 ,会 将 其返回到池中 缓 存,等待下次使用,而不是直接关 闭 。

当 然,数 据库 连 接池会 控制连 接总 数 的上限以及空闲 连 接数 的上限,如果连 接池创 建的总 连 接数 己达 到上限,且都已被占用,则 后续 请 求连 接的线 程会 进 入阻塞队 列等待,直到有线 程释 放出可用的连 接。

如果连 接池中空闲 连 接数 较 多,达 到 其上限,则 后续 返回的空闲 连 接不会 放入池中,而是直接关 闭 ,这 样 可以减 少系统 维 护 多余数 据库连接的开销。

  • 如果将总连接数的上限设置得过大,可能因连接数过多而导致数据库僵死,系统整体性能 下降;
  • 如果总连接数上限过小,则无法完全发挥数据库的性能,浪费数据库资源。如果将空闲 连接的上限设置得过大,则会浪费系统资源来维护这些空闲连接;
  • 如果空闲连接上限过小,当 出现 瞬间 的峰值 请 求时 ,系统 的快速响 应 能力就比较 弱。

所以在设 置数 据库 连 接池的这 两 个 值 时,需要进行性能测试、权衡以及一些经验。

PooledDataSource实 现 了简 易数 据库 连 接池的功能,它 依赖 的组 件如图 所示,其中需 要注意的是,PooledDataSource创 建新数 据库 连 接的功能是依赖 其中封装 的UnpooledDataSource 对象实现的。

image-20200609115725188

PooledConnection

PooledDataSource并 不会 直接管理java.sql.Connection对 象,而是管理 PooledConnection对 象。

PooledConnection中封装 了真 正的数 据库 连 接对 象(java.sql.Connection) 以及其代理对 象,这 里的代理对 象是通过 JDK动 态 代理产 生的。PooledConnection继 承了 InvocationHandler 接口。

核心字段:

// 记录当前PooledConnection对 象所在的 PooledDataSource对 象。
// 该 PooledConnection是从该 PooledDataSource中获 取的;
// 当 调 用close() 方法时 会 将 PooledConnection放回该PooledDataSource 中
private final PooledDataSource dataSource;
// 真正的数据库连接
private final Connection realConnection;
// 数据库连接代理对象
private final Connection proxyConnection;
// 从连接池中取出该连接的时间戳
private long checkoutTimestamp;
// 创建该连接的时间戳
private long createdTimestamp;
// 最后一次使用的时间戳
private long lastUsedTimestamp;
// 由数据库URL、用户名和密码计算出来的hash值,可用于标识该连接所在的连接池
private int connectionTypeCode;
// 检测当前PooledConnection是否有效,主要是为了防止程序通过close()方法将连接还给连接池之后
// 依然通过该连接操作数据库
private boolean valid;

PooledConnection.invoke()方法的实 现 ,该 方法是proxyConnection这 个 连 接代理对 象的真 正代理 逻 辑 ,它 会 对 close()方法的调 用进 行代理,并 且在调 用真 正数 据库 连 接的方法之前进 行检 测 , 代码 如下:

public Object invoke(Object proxy, Method method, Object[] args) 
  throws Throwable {
  String methodName = method.getName();
  // 如果调用close()方法,则将其重新放入连接池,而不是真正关闭数据库连接
  if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
    dataSource.pushConnection(this);
    return null;
  } else {
    try {
      if (!Object.class.equals(method.getDeclaringClass())) {
        // issue #579 toString() should never fail
        // throw an SQLException instead of a Runtime
        // 通过valid字段检测连接是否有效
        checkConnection();
      }
      // 调用真正数据库连接对象对应的方法
      return method.invoke(realConnection, args);
    } catch (Throwable t) {
      throw ExceptionUtil.unwrapThrowable(t);
    }
  }
}

PoolState

PoolState是 用 于 管 理 PooledConnection对 象 状 态 的 组 件 ,它 通 过 两 个 ArrayList <PooledConnection>集合分别 管理空闲 状 态 的连 接和活跃 状 态 的连 接,定义 如下:

// 空闲的PooledConnection对象集合
protected final List<PooledConnection> idleConnections = 
  new ArrayList<PooledConnection>();
// 活跃的PooledConnection集合
protected final List<PooledConnection> activeConnections = 
  new ArrayList<PooledConnection>();
// 请求数据库连接的次数
protected long requestCount = 0;
// 获取连接的累积时间
protected long accumulatedRequestTime = 0;
// checkoutTime表示应用从连接池中取出连接,到归还的这段时长
// accumulatedCheckoutTime记录了所有连接累积的checkoutTime时长
protected long accumulatedCheckoutTime = 0;
// 当连接长时间未归还给连接池的时候,会被认该连接超时
// claimedOverdueConnectionCount记录了超时连接个数
protected long claimedOverdueConnectionCount = 0;
// 累积超时时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
// 累积等待时间
protected long accumulatedWaitTime = 0;
// 累积等待次数
protected long hadToWaitCount = 0;
// 无效的连接数
protected long badConnectionCount = 0;

PooledDataSource

PooledDataSource中 管 理 的 真 正 的 数 据 库 连 接 对 象 是 由 PooledDataSource中封装 的UnpooledDataSource对 象 创 建 的 ,并 由 PoolState管 理 所 有 连 接 的 状 态 。

PooledDataSource中核心字段的含义 和功能如下:

// 通过 PoolState管理连接池的状态并记录统计信息
private final PoolState state = new PoolState(this);

// 记录UnpooledDataSource对象,用于生成真实的数据库连接对象,构造函数中会初始化该字段
private final UnpooledDataSource dataSource;

// 最大活跃连接数
protected int poolMaximumActiveConnections = 10;
// 最大空闲连接数
protected int poolMaximumIdleConnections = 5;
// 最大checkout时长
protected int poolMaximumCheckoutTime = 20000;
// 最大等待时间
protected int poolTimeToWait = 20000;
// 最大无效连接数量
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 在检测一个数据库连接是否可用的时候,会给数据库发送一个测试SQL语句
protected String poolPingQuery = "NO PING QUERY SET";

protected boolean poolPingEnabled;

// 当连接超过 poolPingConnectionsNotUsedFor毫秒未使用时 ,会发送一次测试SQL语句,检测连接是否正常
protected int poolPingConnectionsNotUsedFor;

// 根据数据库的URL、用户名和密码生成的一个hash值,该哈希值用于标志着当前的连接池,在构造函数中初始化
private int expectedConnectionTypeCode;

PooledDataSource.getConnection()方 法 首 先 会 调 用 PooledDataSource.popConnection()方 法 获 取 PooledConnection 对 象,然后通过 PooledConnection.getProxyConnection()方法获 取数 据库 连 接的代理对 象。popConnection()方法是PooledDataSource的核心逻 辑 之一,其具体 逻 辑 如图。

image-20200609150810073

PooledDataSource.popConnection()方法的具体实现:

private PooledConnection popConnection(String username, String password) 
  throws SQLException {
  boolean countedWait = false;
  PooledConnection conn = null;
  long t = System.currentTimeMillis();
  int localBadConnectionCount = 0;

  while (conn == null) {
    synchronized (state) {
      if (!state.idleConnections.isEmpty()) {
        // Pool has available connection
        conn = state.idleConnections.remove(0);
        if (log.isDebugEnabled()) {
          log.debug("...");
        }
      } else {
        // Pool does not have available connection
        if (state.activeConnections.size() < poolMaximumActiveConnections) {
          // Can create new connection
          conn = new PooledConnection(dataSource.getConnection(), this);
          if (log.isDebugEnabled()) {
            log.debug("...");
          }
        } else {
          // Cannot create new connection
          PooledConnection oldestActiveConnection = state.activeConnections.get(0);
          long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
          if (longestCheckoutTime > poolMaximumCheckoutTime) {
            // Can claim overdue connection
            state.claimedOverdueConnectionCount++;
            state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
            state.accumulatedCheckoutTime += longestCheckoutTime;
            state.activeConnections.remove(oldestActiveConnection);
            if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
              try {
                oldestActiveConnection.getRealConnection().rollback();
              } catch (SQLException e) {
                log.debug("Bad connection. Could not roll back");
              }  
            }
            conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
            conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
            conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
            oldestActiveConnection.invalidate();
            if (log.isDebugEnabled()) {
              log.debug("....");
            }
          } else {
            // Must wait
            try {
              if (!countedWait) {
                state.hadToWaitCount++;
                countedWait = true;
              }
              if (log.isDebugEnabled()) {
                log.debug("...");
              }
              long wt = System.currentTimeMillis();
              state.wait(poolTimeToWait);
              state.accumulatedWaitTime += System.currentTimeMillis() - wt;
            } catch (InterruptedException e) {
              break;
            }
          }
        }
      }
      if (conn != null) {
        // ping to server and check the connection is valid or not
        if (conn.isValid()) {
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
          conn.setCheckoutTimestamp(System.currentTimeMillis());
          conn.setLastUsedTimestamp(System.currentTimeMillis());
          state.activeConnections.add(conn);
          state.requestCount++;
          state.accumulatedRequestTime += System.currentTimeMillis() - t;
        } else {
          if (log.isDebugEnabled()) {
            log.debug("...");
          }
          state.badConnectionCount++;
          localBadConnectionCount++;
          conn = null;
          if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
            if (log.isDebugEnabled()) {
              log.debug("...");
            }
            throw new SQLException("...");
          }
        }
      }
    }

  }

  if (conn == null) {
    if (log.isDebugEnabled()) {
      log.debug("...");
    }
    throw new SQLException("...");
  }

  return conn;
}

通 过 前 面 对 PooledConnection.invoke()方法的分析我们 知道,当 调 用连 接的代理对 象的 close()方 法 时 ,并未关闭真正的数据连接 ,而是调用PooledDataSource.pushConnection()方法将 PooledConnection对 象归 还 给 连 接池,供之后重用。

PooledDataSource.pushConnection()方法也是 PooledDataSource的核心逻 辑 之一,其逻 辑 如图

image-20200609151224380

PooledDataSource.pushConnection()代码如下:

protected void pushConnection(PooledConnection conn) throws SQLException {

  synchronized (state) {
    state.activeConnections.remove(conn);
    if (conn.isValid()) {
      if (state.idleConnections.size() < poolMaximumIdleConnections 
          && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
        state.accumulatedCheckoutTime += conn.getCheckoutTime();
        if (!conn.getRealConnection().getAutoCommit()) {
          // 回滚未提交的事务
          conn.getRealConnection().rollback();
        }
        PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
        state.idleConnections.add(newConn);
        newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
        newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
        conn.invalidate();
        if (log.isDebugEnabled()) {
          log.debug("...");
        }
        state.notifyAll();
      } else { // 空闲连接数已达到上限或PooledConnection对象并不属于该连接池
        state.accumulatedCheckoutTime += conn.getCheckoutTime();
        if (!conn.getRealConnection().getAutoCommit()) {
          conn.getRealConnection().rollback();
        }
        conn.getRealConnection().close();
        if (log.isDebugEnabled()) {
          log.debug("...");
        }
        conn.invalidate();
      }
    } else {
      if (log.isDebugEnabled()) {
        log.debug("...");
      }
      // 统计无效PooledConnection对象个数
      state.badConnectionCount++;
    }
  }
}

这里需要注意的是,PooledDataSource.pushConnection()方法和popConnection()方法中都调 用了 PooledConnection.isValid()方 法 来 检 测 PooledConnection的 有 效 性 , 该 方 法 除 了 检 测 PooledConnection.valid 字段的值 ,还 会 调 用 PooledDataSource.pingConnection()方法尝 试 让 数 据 库 执 行podPingQuery字段中记 录 的测 试 SQL语 句,从 而检 测 真 正的数 据库 连 接对 象是否依然 可以正常使用。

public boolean isValid() {
  return valid && realConnection != null && dataSource.pingConnection(this);
}

protected boolean pingConnection(PooledConnection conn) {
  boolean result = true;

  try {
    result = !conn.getRealConnection().isClosed();
  } catch (SQLException e) {
    if (log.isDebugEnabled()) {
      log.debug("...");
    }
    result = false;
  }

  if (result) {
    if (poolPingEnabled) {
      // 检测poolPingEnabled设置,是否运行执行测试SQL语 句
      // 长时间(超过 poolPingConnectionsNotUsedFor指定的时长)未使用的连接,才需要ping
      // 操作来检测数据库连接是否正常
      if (poolPingConnectionsNotUsedFor >= 0 
          && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
        try {
          if (log.isDebugEnabled()) {
            log.debug("...");
          }
          Connection realConn = conn.getRealConnection();
          Statement statement = realConn.createStatement();
          ResultSet rs = statement.executeQuery(poolPingQuery);
          rs.close();
          statement.close();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          result = true;
          if (log.isDebugEnabled()) {
            log.debug("...");
          }
        } catch (Exception e) {
          log.warn("...");
          try {
            conn.getRealConnection().close();
          } catch (Exception e2) {
            //ignore
          }
          result = false;
          if (log.isDebugEnabled()) {
            log.debug("...");
          }
        }
      }
    }
  }
  return result;
}

最后需要注意的是PooledDataSource.forceCloseAll()方法,当 修改PooledDataSource的字段 时 ,例如数 据库 URL用户名密码autoCommit配置等,都会 调 用forceCloseAll()方法将 所 有数 据库 连 接关 闭 ,同时 也会 将 所有相应 的PooledConnectiori对 象都设 置为 无效,清 空 activeConnections 集 合 和 idleConnections 集 合 。

应用系统之后通过PooledDataSource.getConnection()获取连接时,会按照新的配置重新配置新的数据库连接以及相应的PooledConnection对象。

public void forceCloseAll() {
  synchronized (state) {
    expectedConnectionTypeCode = assembleConnectionTypeCode(
      dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
    for (int i = state.activeConnections.size(); i > 0; i--) {
      try {
        PooledConnection conn = state.activeConnections.remove(i - 1);
        conn.invalidate();

        Connection realConn = conn.getRealConnection();
        if (!realConn.getAutoCommit()) {
          realConn.rollback();
        }
        realConn.close();
      } catch (Exception e) {
        // ignore
      }
    }
    for (int i = state.idleConnections.size(); i > 0; i--) {
      try {
        PooledConnection conn = state.idleConnections.remove(i - 1);
        conn.invalidate();

        Connection realConn = conn.getRealConnection();
        if (!realConn.getAutoCommit()) {
          realConn.rollback();
        }
        realConn.close();
      } catch (Exception e) {
        // ignore
      }
    }
  }
  if (log.isDebugEnabled()) {
    log.debug("PooledDataSource forcefully closed/removed all connections.");
  }
}

参考

  • 《MyBatis技术内幕》

  • 部分图片来源——《MyBatis技术内幕》

MyBatis 《MyBatis技术内幕》 基础支持层

相关推荐



版权声明




留言区

文章目录