LXX的网络日志
人因梦想而伟大
JTA+Atomikos分布式事务源码剖析(二):Atomikos创建的数据库连接

AtomikosDataSourceBean,作为数据库连接池,后面创建的数据库连接都会从这来。此次主要是研究一下,它创建的数据库连接是什么?

这个com.atomikos.jdbc包下面的AtomikosDataSourceBean,打开看看。

2022111001.png

它里面有一个doInit()方法,看代码就是做了一下初始化的操作。(抛出异常的message,也不换个行-_-)

2022111002.png

大致看了看这里面的代码,貌似也没什么比较有用的信息。

不过需要提一嘴,在创建AtomikosDataSourceBean的时候,手动设置了一下

XADataSource
DruidXADataSource dataSource = new DruidXADataSource();
// Druid的一大堆鬼配置就省略了
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(dataSource);

所以,Atomikos的底层在操作一些关于数据库连接的时候,还是跟配置的连接池有关,我这里用的就是Druid。

不过关于Druid的数据库连接池代码就不去看了,因为不是我要关注的重点。
在第一张AtomikosDataSourceBean的截图,它继承了AbstractDataSourceBean,或许更加应该看看它的代码。

在代码里面找了找,找到getConnection()方法的代码

然后它调用了init()方法,应该是去做一些关于这个连接的初始化工作。

2022111003.png

然后init()方法里面,有去调用doInit()方法创建一个ConnectionFactory,这个方法是AtomikosDataSourceBean里面的方法

在上面的代码截图,doInit()方法里面有这么两行代码。这个ConnectionFactory是Atomikos自己的。把xaDataSource给传了进去,封装了一下这个xaDataSource,这个xaDataSource就是我指定的Druid的数据库连接池。

com.atomikos.datasource.pool.ConnectionFactory cf = new com.atomikos.jdbc.AtomikosXAConnectionFactory(xaDataSource, tr, this);
Configuration.addResource ( tr );

然后还有就是创建了一个ConnectionPool

if (enableConcurrentConnectionValidation) {
    connectionPool = new ConnectionPoolWithConcurrentValidation(cf, this);
} else {
    if ( getTestQuery() != null ) 
        LOGGER.logWarning ( this + ": testQuery set - pool may be slower / you might want to consider setting maxLifetime instead..." );
    connectionPool = new ConnectionPoolWithSynchronizedValidation(cf, this);
}

2022111004.png

然后再回到getConnection()方法里面,下面这行代码

通过连接池获取了一个连接

connection = (Connection) connectionPool.borrowConnection();

然后进去borrowConnection()方法里面看看

它先去执行findExistingOpenConnectionForCallingThread()方法,获取一个Reapable,如果为null,就去执行findOrWaitForAnAvailableConnection()方法获取。

Reapable ret = null;	
ret = findExistingOpenConnectionForCallingThread();	
if (ret == null) {
    ret = findOrWaitForAnAvailableConnection();		
}

2022111005.png

那就看看findExistingOpenConnectionForCallingThread()方法里面干了什么。

发现就去调用了recycleConnectionIfPossible()方法。

2022111006.png

再进去看看recycleConnectionIfPossible()方法。

看代码,遍历一下这个connections,然后获取出来XPooledConnection,然后去执行canBeRecycledForCallingThread()方法,先不管这个方法是干嘛用的。

createConnectionProxy()方法,创建了一个Reapable,那就进去看看。

2022111007.png

进来createConnectionProxy()方法,调了好几个方法,我觉得最主要的还是doCreateConnectionProxy()方法

2022111008.png

进去doCreateConnectionProxy()方法看看,这里面的代码就是设置一下事务的隔离级别

然后传入connection , sessionHandleState,创建一个Reapable

2022111009.png

进来newInstance()方法一看,发现主要还是给这个Connection创建一个代理类。

2022111010.png

我觉得最主要的还是AtomikosConnectionProxy。

然后进去之后,有一大堆代码,最主要还是invoke()方法。

2022111011.png

简单的总结一下,在ORM框架执行SQL前,肯定要获取一下数据库连接,然后Atomikos就是把这个传入的XADataSource进行了封装,然后给Connection创建了一个动态代理AtomikosConnectionProxy。

然后在invoke()方法里面,会对一些操作进行判断,去执行一些逻辑,例如抛出一些异常啊,做一些通知啊等等。