commons-pool
- ObjectPool 对象池,定义基础方法,获取对象、归还对象、添加对象。
- PooledObjectFactory 对ObjectPool方法的补充,创建池中的对象和销毁对象。
- ObjectPoolFactory 对象池的工厂类,用于创建对象池
commons-pool2提供的是面向接口的编程,为我们提供了一个抽象的对象池管理方式。我们只需要按实际业务去重写或实现一些方法和接口即可。
GenericObjectPool
对象池的核心类,它实现了对对象池的管理,是一个基本的对象池实现(ObjectPool接口)。一般情况下,我们可以直接使用它。它的构造函数提供了两个重要的参数:GenericObjectPoolConfig<T>
类和PooledObjectFactory<T>
接口的实现。PooledObjectFactory<T>
接口的实现由我们继承并实现。GenericObjectPool
内部持有两个数据结构:ConcurrentHashMap
和LinkedBlockingDeque
(为什么使用该数据结构?)。前者用于存储所有的对象(不含销毁的对象),后者用于存储空闲的对象。
borrowObject
T borrowObject(final long borrowMaxWaitMillis)
- 从空闲队列
idleObjects
获取一个资源,如果资源为null,则新建一个资源 - 如果最大资源数已达上限,则创建失败,返回null;否则直接创建新资源,并将资源添加到池中
allObjects
,然后返回新建的资源 - 如果资源创建失败返回null,判断是否已达最大等待时间,如果没有则继续获取创建资源直达超时位置;如果资源创建成功,则初始化资源状态
- 获取testOnBorrow参数是否在借用资源时校验资源有效性。如果是,则对资源进行校验,校验失败则销毁资源;
- 返回资源
returnObject
void returnObject(final T obj)
- 首先判断归还的对象是否为池中创建的对象,如果不是则抛异常提示错误;
- 获取testOnReturn参数是否在对象归还时校验对象的有效性。如果是,则对对象进行校验,校验失败则销毁对象;
- 还原对象到初始化状态;
- 判断池中的空闲对象是否已到上线,如果已达上线则销毁对象;
- 将对象重新加入空闲队列的列头或者列尾。
PooledObjectFactory
这个接口是我们要实现的,定义了创建、销毁等对池中对象的操作。在池中一个对象在使用前或使用后可能会有不同的状态,这个接口提供了标准的接口来使对象在不同的状态间切换:activeObject()
和passivateObject()
这两个方法对对象的状态进行切换。activeObject()
是使某个对象处于初始化后的状态,是在borrwoObject
从池中获取资源后调用的,是资源处于初始化后的状态。passivateObject()
是使某个对象钝化,即在归还到池中前使对象还原成初始化前的状态。
GenericObjectPoolConfig 参数配置类,允许使用者对对象池的一些参数做调整。
主要参数解释:
- testOnBorrow 获取对象的时候要校验对象的有效性
- testOnReturn 归还对象的时候要校验对象的有效性
- testWhileIdle 定时对空闲对象池进行有效性校验
- minIdle 池初始化时默认生成多少个空闲资源
- maxIdle 池中最大能储存多少个空闲资源。主要用来限制returnObject方法,return的资源是否返回到池中。如果池中空闲资源已达到上限,则归还的资源会被destroy掉。
- maxActive 激活的最大资源数。是指同时有多少个资源可以使用在borrowObject()方法起作用。如果maxActive已达上限,borrowObject方法就会阻塞等待一定时间,不能立马获取到资源。
- maxWait 在获取不到资源的时候,阻塞等待最大时间。
commons-pool2的应用 redis的Java客户端JedisPool(jedis[https://github.com/xetorthio/jedis]);dbcp数据库连接池[https://github.com/apache/commons-dbcp]。
commons-dbcp
PoolingDataSource
- Connection getConnection() 从连接池中获取一个连接
- void close() 关闭连接,但其实内部并没有真正关闭连接,而是将归还到连接池
PoolableConnectionFactory
实现了makeObject()
方法创建新的连接。
JDBC介绍
jdbc是一套数据库协议,sun定义一组接口,数据库厂商来实现。相关接口在java.sql包中,封装了对数据库的各种操作。
- Driver接口 是所有JDBC程序必须实现的接口,该接口专门提供给数据库厂商使用。编写JDBC程序时,必须指定数据库驱动程序或把类库加载到项目classpath中。
- DriverManager类 用于加载JDBC驱动并创建与数据库的连接。 1)DriverManager.registerDriver(Driver driver) 用于向DriverManager注册给定的JDBC驱动程序 2)DriverManager.getConnection(String url, String user, String pwd) 建立与数据库的连接,返回表示连接的Connection对象
- Connection接口 代表与数据库的连接
- Statement接口 用于向数据库发送SQL语句
- ResultSet接口
数据库返回的数据集,ResultSet是基于连接的,所以操作ResultSet时Connection必须保持连接。只需将配置属性的factory属性值设置为
org.apache.tomcat.jdbc.pool.DataSourceFactory
即可使用tomcat dbcp模块功能。
JDBC 4.0
JDBC驱动类的自动加载
代码中可以不用显示的通过Class.forName()
注册JDBC驱动,当首次调用DriverManager.getConnection()
方法时,DriverManager类会自动设置合适的驱动程序。
连接数据库的两种方式
这里回顾一下JDBC连接数据库的两种方式:DriverManager和DataSource
使用方式:
- DriverManager使用方式:
DriverManager.getConnection()
; - DataSource使用方式:
dataSource.getConnection()
;
什么是数据源 JDBC2.0 提供了javax.sql.DataSource接口,它负责建立与数据库的连接,当在应用程序中访问数据库时不必编写连接数据库的代码,直接引用DataSource获取数据库的连接对象即可。用于获取操作数据Connection对象,DataSource接口由数据库驱动提供商实现。
DataSource数据源通过JNDI方式实现,使用DataSource数据源要先在JNDI中注册该数据源对象。
如果在JNDI中注册了数据源对象,将会比起使用DriverManager来具有两个方面的优势:
首先,程序不需要像使用DriverManager一样对加载的数据库驱动程序信息进行硬编码,程序员可以选择先在JNDI中注册这个数据源对象,然后在 程序中使用一个逻辑名称来引用它,JNDI会自动根据你给出的名称找到与这个名称绑定的DataSource对象。然后就可以使用这个 DataSource对象来建立和具体数据库的连接了。
其次,使用实现了DataSource接口的类所具有的第二个优势体现在连接池和分布式事务上。连接池通过对连接的复用而不是新建一个物理连接来显著地提高程序的效率。从而适用于任务繁忙、负担繁重的企业级分布式事务。
JDBC 四大对象
- DataSource
- Connection
- Statement
- ResultSet
Spring JdbcTempate
将Statement的重复代码给解决了,进行了封装。
数据库连接池
JdbcTemplate只解决了重复代码的问题,并没有解决连接的资源问题,所以需要连接池。
一些开源的数据库连接池:
- DBCP Aache
- C3P0
- Druid
- Hikari
持久层框架
持久层框架帮忙解决了对象映射问题,将查询的数据自动映射到对象上。
一些开源的持久层框架:
- MyBatis
- Hibernate
tomcat jdbc-pool
基于commoms-pool和commons-dbcp进行优化。
ConnectionPool
连接池内部通过持有PooledConnection
来实现池化技术。
void init(PoolConfiguration properties)
内部持有busy和idle两个LinkedBlockingQueue
结构的连接池队列
tomcat jdbc-pool官网上说它解决/优化了commons-dbcp的一些地方:
- 新增了拦截器,在连接归还到连接池时会检查连接的statement是否关闭,如果没有则将其关闭。
- 修复bug49831,当连接池关闭的时候确保事务连接XAConnections是关闭的
- 优化连接获取阻塞问题,当另一个线程释放连接的时候,正在获取新连接的其他线程不会被阻塞
- 在不增加线程的情况下,新增异步获取连接的方式。
- 新增饥饿证明,等待连接最久的线程会优化获取到连接。
看看tomcat是如何实现拦截器的
abstract class JdbcInerceptor implements InvocationHandler
实现InvocationHandler接口,通过动态代理方式?其他类通过继承该类实现拦截器的作用。
class StatementFinalizer extends AbstractCreateStatementInterceptor
,内部定义了一个Statement的封装类StatementEntry,StatementEntry使用WeakReference
DataSourceFactory .getObjectInstance -> createDataSource new DataSource -> createPool: DataSrouceProxy -> new ConnectionPool
ConnectionPool
使用BlockingQueue来存储active,idle连接
BlockingQueue
构造函数 -> init(PoolConfiguration properties) 1. busy = new LinkedBlockingQueue<>(); 2. idle = new LinkedBlockingQueue<>(); 3. initializePoolCleaner(properties) -> new PoolCleaner().start() extend TimerTask -> checkAbandoned();checkIdle();testAllIdle(); 4. create interceptors 5. initialzie PooledConnection: borrowConnection(); -> return to the idle: returnConnection(initialPool);
borrowConnection: // Thread safe way to retrieve a connection from pool
PooledConnection borrowConnection(int wait, String username, String password)
1. idle.poll() 从idle队列获取连接 -> 校验获取到的连接的有效性: borrowConnection(now, con, username, password)
如果连接有效,则borrowedCount+1并返回连接PooledConnection,结束。
2. 如果连接获取失败.判断当前线程池是否已达上限,如果没有,则创建新的连接 createConnection(),并返回连接。
3. 如果连接线已达上限,则waitcount+1,并尝试再次获取连接idle.poll(timetowait),如果还是为null,则判断是否已超时。如果超时则抛出异常:获取连接失败;否则重新回到步骤1继续重新获取连接。
PS:在校验size.get() < getPoolProperties().getMaxActive() 这段代码是否有疑问,如果方法上没有加同步的话(getConnection没有同步),这里如果并发多个线程来获取,就会超出maxActive。这里之后再研究研究???
AS:哈哈,这里看到了。代码写的好奇妙,知道它这里是怎么防止并发的了。
size是原子类型的,第一道关卡进行正常值校验
size < maxActive
这行代码对单线程有用,多线程下是无效的,所以有了第二道关卡的校验
size.addAndGet(1) > maxActive
因为size是原子类型,这里addAndGet(1)如果大于maxActive,则再decrementAndGet然后结束。这里如果出现多线程并发的话,addAndGet(1)会永远都大于maxActive,所以这里就不用担心跳过校验。只有addAndGet(1) < maxActive才可以新建连接
createConection(now, con, username, password)
createConnection: 在busy队列满的情况下,会直接返回新建的connection。那这样这个con在busy队列里就找不到了。
if(!busy.offer(con)) {
log.dug("");
}
createdCount.incrementAndGet();
return con;
returnConnection: 如果con不是busy队列的就直接释放release;如果是则从busy队列中移除,然后判断idle队列是否可以放入,如果可以放入则结束;否则直接释放。
getConnection: borrowConnection(); return setupConnection(con); -> 动态代理生成,最终返回的是ProxyConnection extends JdbcInterceptor implements InvocationHandler
Conection connection = (Conection)proxyClassConstructor.newInstance(new Object[] {handler}); 这里创建了一个connection:PooledConnection,connection方法调用的时候都会被ProxyConnection拦截。PooledConnection内部支持的connection是第三方driver实现的connection(比如:H2的JdbcConnection)。
getConnectionAsync: tomcat dbcp pool的异步获取是通过使用CountDownLatch来实现的,ReentrantLock保证顺序 borrowConnection(0, null, null) 首先会尝试获取一次连接(不等待wait:0),如果获取成功则封装成ConnectionFuture返回; 将idle队列转为FairBlockingQueue,然后调用idle.pollAsync()方法: return new ConnectionFuture(); -> ItemFuture: ExchangeCountDownLatch:当有con使用完归还到线程池里,会判断waiters等待线程队列中是否有线程在等待资源。如果有则优先将把con交给该线程,调用setItem(con)和c.countDown()。
ConnectionFuture: get -> itemFuture.get -> ExchangeCountDownLatch.await() return ExchangeCountDownLatch.getItem();
release abandon checkIdle PolCleaner ConnectionFuture
HikariCP
- 优化并精简字码:使用Java字节码修改类库Javassist来生成委托实现动态代理,JDK Proxy生成的字节码更少
- 使用更好的并发集合类ConcurrentBag
- 使用FastList替代ArrayList SpringBoot 2.x已经使用CP作为默认的数据库连接池
HikariDataSource
final HikariPool fastPathPool volatile HikariPool pool
getConnection 从连接池获取连接
HikariPool
final ConcurrentBag
getConnection: poolEntry = connectionBag.borrow(); connection = poolEntry.createProxyConnection(); return connection;
PoolEntry
createProxyConnection: return ProxyFactory.getProxyConnection(); -> JavassistProxyFactory -> ProxyConnection
ProxyConnection
close 连接归还连接池
poolEntry.recycle();
hikariPool.recycle();
connectionBag.requite();
SynchronousQeueue
commons-dbcp tomcat dbcp-pool hikaricp
- commons-dbcp 使用LinkedBlockingDeque
- dbcp-pool 使用LinkedBlockingQueue
- cp 使用ThreadLocal<List
连接池常用数据结构
LinkedBlockingQueue
由链表实现的有界阻塞队列,FIFO特性,队列容量大小默认为Integer.MAX_VALUE,使用时建议手动修改,避免队列过大造成机器负载或内存爆满(如果添加速度远大于删除速度,有可能会内存溢出,这里要注意)。 其内部添加和删除操作使用两个ReentrantLock来控制并发,两个单独的锁(因此添加和删除操作并不是互斥操作,可以同时进行。在数据库连接池应用中就不会阻塞获取连接和归还连接操作,提高性能),吞吐量优于基于数组的ArrayBlockingQueue(内部使用一个ReentrantLock控制并发)。
节点数据结构
static final class Node<E> {
E item;
Node<E> next;
Node(E x) {
item = x;
}
}
LinkedBlockingQueue 属性
private final int capacity; // 队列容量,默认为Integer.MAX_VALUE
private final AtomicInteger count = new AtomicInteger(0); 统计队列中元素数量(很多结构都会专门用一个变量来保存实际大小方便统计和操作)
private transient Node<E> head; // head of linked list
private transient Node<E> last; // tail of linked list
private final ReentrantLock takeLock = new ReentrantLock(); // lock held by take, poll, etc
private final Condition notEmpty = takeLock.newCondition(); // wait queue for waiting takes
private final ReentrantLock putLock = new ReentrantLock(); // lock held by put, offer, etc
private final Condition notFull = takeLock.newCondition(); // wait queue for waiting puts
return con;
Integer.MAX_VALUE 1byte(字节) = 8bit(位) Java中int占用4个字节byte,即32位bit,表示的范围为2的31次方再-1.因为最高位表示正负数。
LinkedBlockingQueue 添加方法
- boolean add() 添加元素到队尾。如果添加成功返回true,队列已满则抛出异常
- boolean offer() 添加元素到队尾。如果添加成功返回true,队列已满则直接返回false
- void put() 添加元素到队尾,不返回操作结果。如果添加成功,直接结束;队列已满则进入等待直到队列有空闲空间可用。
添加方法主要实现:
- 添加时,使用putLock可重入锁保证线程安全
- 共同调用了enqueue方法实现节点的添加
- 改变队列元素数量时,调用AtomicInteger.getAndIncrement方法,保证原子性
LinkedBlockingDeque
LinkedBlockingDeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和LIFO两种操作方法,即可以从队列的头和尾同时操作;并且,该阻塞队列是线程安全的。
双向节点数据结构
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}