您的位置 首页 java

Java实现Mysql数据库连接池

在Java访问mysql的时候,需要用到jdbc驱动,传统连接方式为:

 try {
 Driver mysqlDriver = (Driver) Class.forName("com.mysql.jdbc.Driver").newInstance();
 DriverManager.registerDriver(mysqlDriver);
 Connection connection = DriverManager.getConnection("jdbc:mysql://192.168.0.***:3306/rzframe?useSSL=false&serverTimezone=UTC", "root", "*******");
 Statement statement = connection.createStatement();
 ResultSet resultSet = statement.executeQuery("select * from rz_user");//查询
 connection. close ();
 } catch (Exception e) {
 e.printStackTrace();
 }
 

我们对上面的代码做一个简单的性能测试,代码如下:

 public static void main(String[] args) {
 long start =  System .currentTimeMillis();
 CountDownLatch countDownLatch = new CountDownLatch(100);
 for (int i = 0; i < 1000; i++) {
 try {
 CountDownLatch finalCountDownLatch = countDownLatch;
 Thread thread = new  Thread (() -> {
 try {
 doJDBC();
 } catch (Exception ex) {
 } finally {
 finalCountDownLatch.countDown();
 }
 });
 thread.start();
 if (i != 0 && i % 100 == 0) {
 countDownLatch.await();
 System.out.println(i);
 countDownLatch = new CountDownLatch(100);
 }
 } catch (Exception ex) {
 }
 }
 long end = System.currentTimeMillis();
 System.out.println("耗时:" + (end - start));
 }
 

上面代码用了100个 线程 分批次去完成查询的动作,在我的机器上运行时间45s左右。


从上面的代码可以看出问题, Connection对象 每一次都是重新创建,查询完成后,直接是调用close方法,如果不释放,会报连接数过多的异常。 如果查询多次,那浪费在创建Connection的时间就会很多,我们知道在程序优化的手段中,有一个池化可以很好的解决这个问题。

池化的概念就是先创建多个对方存在在一个容器中,当时候的时候可以直接拿出来时候,用完后再进行归还。 跟着这个思想,我们来创建自己的连接池。


编写思路

  1. 创建一个线程安全的容器(由于是多线程访问),队列或者是list,因为Connection的对象并不是有序的,所以可以使用list容器
  2. 对Connection的对象进行封装,增加一个is busy 变量,每次读取的时候就可以选出空闲的Connection对象
  3. 如果取的时候,没有可用的Connection对象,则可以再自动创建对象,可以自动扩容,直到扩容到允许的最大值。

封装的Connection类:

public class PooledConnection {
 private boolean isBusy=false;
 private Connection connection;
 public PooledConnection(Connection connection, boolean b) {
 this.isBusy=b;
 this.connection=connection;
 }
 public boolean isBusy() {
 return isBusy;
 }
 public void setBusy(boolean busy) {
 isBusy = busy;
 }
 public Connection getConnection() {
 return connection;
 }
 public void setConnection(Connection connection) {
 this.connection = connection;
 }
 public void close() {
 this.setBusy(false);
 }
}
 

包装好Connection后,可以考虑如何对Connection进行创建和分配,需要有以下几个方法:

PooledConnection getPooledConnection();
void createPooledConnection();
 

为了更好的程序调试,先定义几个初始的参数变量:

 //数据库相关参数
 private static String jdbcDriver = null;
 private static String jdbcUrl = null;
 private static String userName = null;
 private static String password = null;
 
 //容器参数
 private static int initCount;//初始数量
 private static int stepSize;//每次扩容的数量
 private static int poolMaxSize;//最大数量
 //全局锁
 private static  lock  lock;
 

为了保证线程安全,使用线程安全的Vector集合。


获得对象方法

  1. 获得对象的方法,应该是先找到一个空闲的PooledConnection变量,如果有就直接返回。
  2. 如果没有空闲的变量,则尝试进行扩充,扩充由一个线程完成,其他线程则等待,或者尝试再次获取。
public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
 PooledConnection realConnection = getRealConnection();
 while (realConnection == null) {
 if (lock.tryLock()) {//尝试获取锁
 createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
 lock.unlock();
 } else {
 try {
 Thread.sleep(200);//线程等待
 } catch (InterruptedException e) {
 }
 }
 realConnection = getRealConnection();//再次尝试获取
 if (realConnection != null) {
 return realConnection;
 }
 }
 System.out.println("线程池线程数量:" + PoolsConnections.size());
 return realConnection;
 }
 private PooledConnection getRealConnection() throws SQLException {
 for (PooledConnection pooledConnection : PoolsConnections) {
 try {
 if (pooledConnection.isBusy())
 continue;
 Connection connection = pooledConnection.getConnection();
 if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
 System.out.println("连接无效");
 Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
 pooledConnection.setConnection(validConnect);
 }
 pooledConnection.setBusy(true);
 return pooledConnection;
 } catch (SQLException e) {
 return null;
 }
 }
 return null;
 }
 

扩容方法对象

扩容的方法相对比较简单,判断当前对象数量有没有溢出,如果没有溢出,就进行扩容

public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
 if (poolMaxSize <= 0) {
 System.out.println("创建管道对象失败,最大值参数错误");
 throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
 }
 //判断是否有溢出
 boolean overFlow = isOverFlow(count);
 if (overFlow) {
 return;
 }
 System.out.println("扩容");
 for (int i = 0; i < count; i++) {
 try {
 overFlow = isOverFlow(count);
 if (overFlow)
 return;
 Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
 PooledConnection pooledConnection = new PooledConnection(connection, false);
 PoolsConnections.add(pooledConnection);
 } catch (SQLException e) {
 e.printStackTrace();
 }
 }
 System.out.println("扩容数量:" + PoolsConnections.size());
 }
 private boolean isOverFlow(int count) {
 if (PoolsConnections.size() + count >= poolMaxSize) {
 return true;
 }
 return false;
 } 
 

上面的代码隐藏一个问题,我们增加对数据的查询方法,方便我们测试。 查询方法如下:

 public ResultSet querySql(String sql) {
 try {
 PooledConnection pooledConnection = getPooledConnection();
 Connection connection = pooledConnection.getConnection();
 Statement statement = connection.createStatement();
 ResultSet resultSet = statement.executeQuery(sql);
 Thread.sleep(1000);
 pooledConnection.close();
 return resultSet;
 } catch (Exception e) {
 }
 return null;
 }
 

我们对代码做性能测试同样的测试,在我的电脑运行时间为5s左右,大概快了10倍。 但经过多次测试,代码抛出了ConcurrentModificationException异常,这个异常的原因是因为在使用的时候,我们又修改了正在使用的对象。所以在使用的时候要对对象进行加一个 读写锁

为了锁不至于影响到锁的性能,我们把锁碎片化,采用针对每一个对象进行加锁,而不是全局加锁。修改后的封装对象:

 public class PooledConnection {
 private boolean isBusy = false;
 private Connection connection;
 private ReentrantReadWriteLock reentrantReadWriteLock;
 public PooledConnection(Connection connection, boolean b) {
 this.connection = connection;
 reentrantReadWriteLock = new ReentrantReadWriteLock();
 }
 public boolean isBusy() {
 return isBusy;
 }
 public void setBusy(boolean busy) {
 isBusy = busy;
 }
 public Connection getConnection() {
 return connection;
 }
 public void setConnection(Connection connection) {
 this.connection = connection;
 }
 public void close() {
 this.setBusy(false);
 }
 public void shutDown() {
 try {
 this.connection.close();
 } catch (SQLException e) {
 e.printStackTrace();
 }
 }
 //增加读写锁的操作
 public void writeLock() {
 this.reentrantReadWriteLock.writeLock().lock();
 }
 public void unWriteLock() {
 this.reentrantReadWriteLock.writeLock().unlock();
 }
 public void readLock() {
 this.reentrantReadWriteLock.readLock().lock();
 }
 public void unReadLock() {
 this.reentrantReadWriteLock.readLock().unlock();
 }
 }
 

最终结果:

public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
 if (poolMaxSize <= 0) {
 System.out.println("创建管道对象失败,最大值参数错误");
 throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
 }
 PooledConnection realConnection = getRealConnection();
 while (realConnection == null) {
 if (lock.tryLock()) {//尝试获取锁
 createConnections(stepSize);//获得锁之后进行扩容
 lock.unlock();
 } else {
 try {
 Thread.sleep(200);
 } catch (InterruptedException e) {
 }
 }
 realConnection = getRealConnection();
 if (realConnection != null) {
 return realConnection;
 }
 }
 return realConnection;
 }
 private PooledConnection getRealConnection() {
 for (PooledConnection pooledConnection : PoolsConnections) {
 try {
 if (pooledConnection.isBusy())
 continue;
 /*
 此处要保证写的时候不能被读取,不然会报ConcurrentModificationException异常
 */
 pooledConnection.writeLock();//读写互斥,写写互斥
 Connection connection = pooledConnection.getConnection();
 if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
 Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
 pooledConnection.setConnection(validConnect);
 }
 pooledConnection.setBusy(true);
 pooledConnection.unWriteLock();
 return pooledConnection;
 } catch (SQLException e) {
 return null;
 }
 }
 return null;
 }
 public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
 if (poolMaxSize <= 0) {
 System.out.println("创建管道对象失败,最大值参数错误");
 throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
 }
 //判断是否有溢出
 boolean overFlow = isOverFlow(count);
 if (overFlow) {
 return;
 }
 System.out.println("扩容");
 for (int i = 0; i < count; i++) {
 try {
 overFlow = isOverFlow(count);
 if (overFlow)
 return;
 Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
 PooledConnection pooledConnection = new PooledConnection(connection, false);
 PoolsConnections.add(pooledConnection);
 } catch (SQLException e) {
 }
 }
 System.out.println("扩容数量:" + PoolsConnections.size());
 }
 private boolean isOverFlow(int count) {
 if (PoolsConnections.size() + count >= poolMaxSize) {
 return true;
 }
 return false;
 } 
 

碰到问题

1.首先是无法控制连接最大的数量 ,问题出在扩容没有控制一个线程扩容,使用tryLock解决,代码如下:

while (realConnection == null) {
 if (lock.tryLock()) {//尝试获取锁
 createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
 lock.unlock();
 } else {
 try {
 Thread.sleep(200);
 } catch (InterruptedException e) {
 }
 }
 realConnection = getRealConnection();
 if (realConnection != null) {
 return realConnection;
 }
 }
 

2.ConcurrentModificationException异常,在读取的使用的时候,对象有写入操作,需要保证读取可以并发,读写不能一起,写不同对象是可以并发,使用读写锁可以解决:

reentrantReadWriteLock.writeLock().lock();//读写互斥,写写互斥
 if (!connection.isValid(2000)) {//是否有效,200ms 没有被超时
 System.out.println("连接无效");
 Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
 pooledConnection.setConnection(validConnect);
 }
 pooledConnection.setBusy(true);
 reentrantReadWriteLock.writeLock().unlock();
reentrantReadWriteLock.readLock().lock();
 Statement statement = connection.createStatement();
 ResultSet resultSet = statement.executeQuery(sql);
 reentrantReadWriteLock.readLock().unlock(); 
 

使用上面的代码会存在一个性能问题,就是在写入的时候,如果写入的是不同对象,写入也会进行排斥,所以应该对单个PooledConnection使用锁。

3.把锁进行碎片化优化

public class PooledConnection {
 private boolean isBusy = false;
 private Connection connection;
 private boolean isUsing = false;
 private ReentrantReadWriteLock reentrantReadWriteLock;
 public PooledConnection(Connection connection, boolean b) {
 this.isBusy = b;
 this.connection = connection;
 this.isUsing = false;
 reentrantReadWriteLock = new ReentrantReadWriteLock();
 }
 public PooledConnection() {
 reentrantReadWriteLock = new ReentrantReadWriteLock();
 }
 public boolean isBusy() {
 return isBusy;
 }
 public void setBusy(boolean busy) {
 isBusy = busy;
 }
 public Connection getConnection() {
 this.isUsing = true;
 return connection;
 }
 public boolean isUsing() {
 return isUsing;
 }
 public void setUsing(boolean using) {
 isUsing = using;
 }
 public void setConnection(Connection connection) {
 this.connection = connection;
 }
 public void close() {
 this.isUsing = false;
 this.setBusy(false);
 }
 public void shutDown() {
 try {
 this.connection.close();
 } catch (SQLException e) {
 e.printStackTrace();
 }
 }
 public void writeLock() {
 this.reentrantReadWriteLock.writeLock().lock();
 }
 public void unWriteLock() {
 this.reentrantReadWriteLock.writeLock().unlock();
 }
 public void readLock() {
 this.reentrantReadWriteLock.readLock().lock();
 }
 public void unReadLock() {
 this.reentrantReadWriteLock.readLock().unlock();
 }
 }
 

读的时候加入读锁:

PooledConnection pooledConnection = getPooledConnection();
 /*
 此处要保证读的时候不能被修改,使用读锁
 */
 pooledConnection.readLock();
 Connection connection = pooledConnection.getConnection();
 Statement statement = connection.createStatement();
 ResultSet resultSet = statement.executeQuery(sql);
 Thread.sleep(1000);
 pooledConnection.close();
 pooledConnection.unReadLock();
 return resultSet;
 

写入加锁:

pooledConnection.writeLock();//读写互斥,写写互斥
 Connection connection = pooledConnection.getConnection();
 if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
 Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
 pooledConnection.setConnection(validConnect);
 }
 pooledConnection.setBusy(true);
 pooledConnection.unWriteLock();
 return pooledConnection;
 

优化后耗时:耗时为:3692ms 。

(本文完)

点击右上角关注作者,加关注不迷路,欢迎交流

文章来源:智云一二三科技

文章标题:Java实现Mysql数据库连接池

文章地址:https://www.zhihuclub.com/180970.shtml

关于作者: 智云科技

热门文章

网站地图