您现在的位置是:首页 >其他 >Spring 事务之@Transactional网站首页其他
Spring 事务之@Transactional
Spring 事务之@Transactional
我们以DataSourceTransactionManager作为事务管理器进行分析,Mybatis作为持久层框架
@EnableTransactionManagement
@Configuration
public class Config {
/**
* 注解 @EnableTransactionManagement 开启事务支持,等同于xml配置文件中的 <tx:annotation-driven />
* <p>会自动注册一个 PlatformTransactionManager 事务管理器
* <p>1、如果添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSourceTransactionManager 实例。
* <p>2、如果添加的是 spring-boot-starter-data-jpa 依赖,框架会默认注入 JpaTransactionManager 实例。
*/
@Bean
public PlatformTransactionManager transactionManager(@Autowired DataSource dataSource) {
// 自己自定义注册一个事务管理器,必须传入一个 DataSource
return new DataSourceTransactionManager(dataSource);
}
}
注意了!!!
我们的事务管理器持有DataSource
7大传播机制
简单介绍一下,Spring 7大传播机制
public interface TransactionDefinition {
// require:如果当前没有事务,就新建一个事务,如果已存在一个事务中,加入到这个事务中,这是最常见的选择,也是默认的
int PROPAGATION_REQUIRED = 0;
// supports:支持当前事务,如果没有当前事务,就以非事务方法执行。
int PROPAGATION_SUPPORTS = 1;
// mandatory:使用当前事务,如果没有当前事务,就抛出异常。
int PROPAGATION_MANDATORY = 2;
// require_new:新建事务,如果当前存在事务,把当前事务挂起再新建事务。
int PROPAGATION_REQUIRES_NEW = 3;
// not_supported:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
int PROPAGATION_NOT_SUPPORTED = 4;
// never:以非事务方式执行操作,如果当前事务存在则抛出异常。
int PROPAGATION_NEVER = 5;
// nested:嵌套的,如果当前存在事务,则在嵌套事务内执行,如果没有事务就新建一个事务。
int PROPAGATION_NESTED = 6;
}
-
我们都知道,一个
DataSource
对应一个数据源(数据库),一个DataSource
可以获取多个Connection
,一个Connection
就可以操作多次数据库。 -
而
@Transactional
通知事务,就是通过AOP
的方式,定义切面,在我们操作数据库前,获取到Connection
,再手动取消自动提交connection.setAutoCommit(false)
,执行SQL
成功就手动connection.commit()
,异常就在catch里面手动connection.rollback()
。
我们来简单模拟一下吧,实际上做法很复杂。
@Component
@Aspect
@Order(Integer.MAX_VALUE)
public class SimpleTransactionalAspect {
@Pointcut("@annotation(com.dd.demo01.aop.annotation.SimpleTransactional)")
public void point() {
}
// 简单模拟线程上下文存放connection
public static final ThreadLocal<Connection> resources = new ThreadLocal<>();
@Around("point()")
public Object simpleTransactionControl(ProceedingJoinPoint pjp) throws Throwable {
// 从 resources 获取连接
Connection connection = resources.get();
// 取消自动提交
connection.setAutoCommit(false);
Object result;
try {
// 执行目标方法
result = pjp.proceed();
} catch (Throwable e) {
// 回滚
connection.rollback();
// 关闭连接
connection.close();
throw e;
} finally {
// 清理线程上下文
resources.remove();
}
// 执行成功,提交
connection.commit();
// 关闭连接
connection.close();
return result;
}
}
实际上做法很复杂,我们现在就来一起了解了解它怎么做的吧。
如下面所示,在A事务里面调用B方法,那么B方法的会加入到A事务里面
class A {
@Transactional
public void serviceA() {
// ... 操作数据库
b.serviceB();
}
}
class B {
@Transactional
public void serviceB() {
// ... 操作数据库
}
}
我先说个前置知识吧,上面不是提到了7大传播机制吗?那加入事务,事务有事务是怎么做到的呢?
-
加入事务,典型代表,B的是
require
就会加入到A事务,A和B共同用一个连接Connection -
事务挂起与新建,典型代表,B 是
require_new
就会A事务挂起再新建事务-
事务挂起,将A事务的所有信息封装为
SuspendedResourcesHolder
,内部含有事务信息、A的连接ConnectionA
等等,再把A事务的所有信息从事务管理器TransactionSynchronizationManager
中去掉 -
事务新建,重新申请一个
Connection
,并将B事务的信息设置到TransactionSynchronizationManager
里面 -
TransactionSynchronizationManager
里面有什么东西?怎么保存的,不是说保存在线程上下文吗?你别急,马上给你瞟一眼。
-
-
事务嵌套,典型代表,B是
nested
。事务嵌套就是事务中含有另外的事务,mysql
不允许这样做,所以使用的是保存点。B事务出现异常了,那么就只回滚到建立B事务时设置的保存点,对外层A事务不受影响。注意:A、B事务用的是同一个Connection
TransactionSynchronizationManager
你要的事务同步管理器来了。
有6个ThreadLocal
,它们共同保存了当前事务的信息,可以说与当前线程绑定,上面说的事务挂起,就是将这6个ThreaadLocal对应的信息remove()
掉,暂时封装在SuspendedResourcesHolder
。但是resources
比较特殊哦。
我们平时所说的从ThreadLocal
获取连接,就是从ThreadLocal<Map<Object, Object>> resources
里面获取的。
你骗人,这明明是个Map,怎么变成了,连接,你别急,我这就分析一下,你先看注释,我在后面分析吧,这样显得比较专业一点(狗头保命)
public abstract class TransactionSynchronizationManager {
// 当前线程不同数据源与对应连接的映射: DataSource-->Connection ==> 用map考虑到一个线程之内,同时操作多个数据源,自然对应不同连接
// 当新建一个连接,开启事务后就会添加到里面。当我们获取到事务管理器tm之后,就会获取tm.dataSource来这里尝试获取连接
// 获取到了连接,说明当前存在事务了,至于是否决定要使用该连接还是新建事务,由指定的传播机制决定
// 所以:如果我们开启了异步线程,那么会导致事务失效,因为我们没有获取到连接,新建了一个连接来使用
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
// 事务同步器,事务运行时的扩展代码,每个线程可以注册N个事务同步器。
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
// 当前线程这个时候绑定的事务名
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
// 当前事务是否只读
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
// 当前事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
// 事务是否开启
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
}
我们先分析一手resources
-
当我们切面开始的时候,先获取事务管理器
PlatformTransactionManager
,以下简称tm
(不是他妈哦),这里面持有DataSource
-
我们获取连接,通过
doGetTransaction()
方法,根据这个tm
获取内部持有的this.dataSource
-
通过
DataSource
作为key,去resources
里面的map.get(dataSource)
获取到ConnectionHolder
,看名字就知道了,持有连接。
所以你明白了怎么获取连接了吗。搜嘎,原来是ThreadLocal<Map<DataSource, ConnectionHolder>> resources
。
哎,小伙子,我只能说你太天真了,为啥类型为Object?我们想一下,XxxMaper.dml()
的时候,不是要获取连接吗。
-
首先我们知道,XxxMapper对象是一个代理对象
MapperProxy
,获取连接是通过TransactionSynchronizationManager.resourse,get()
这个map
里面获取的,map.get(sqlSessionFactory)
返回sessionHolder
对象,顾名思义,持有session
。 -
最开始,map里面没有,自然获取不到
map.get(sqlSessionFactory)==null
,通过openSession()
构建一个session
,里面持有通过Configuration
获取的DataSource
,稳了 -
现在
session
有了,sqlSessionFactory
也有了,map.put(sqlSessionFactory,session)
直接设置进去就行了 -
注意咯:
session
里面持有Executor
,Executor
持有Transaction
,Transaction
(这里是SpringManagedTransaction
类型)持有该DataSource
与Connection
,最开始创建的时候,没有Connection
,通过getConnection(dataSource)
获取,从事务同步管理器的resources
的map里面获取连接,没有连接就新建立一个连接,再设置到resources
。 -
现在你知道怎么获取连接了吧了吧。
现在,你看ThreadLocal<Map<Object, Object>> resources
的含义你知道了吗,为啥是Object类型?
欸嘿,上面提到没有连接,这是怎么回事?有一种情况:异步。我们都是从当前线程上下文设置、获取的,你异步操作,怎么可能获取到别人线程的东西嘛。
所以,ThreadLocal<Map<Object, Object>> resources
的map
-
DataSource --> ConnectionHolder
-
SqlSessionFactory --> SessionHolder
持久层先获取到SessionHolder
,再通过内部的DataSource
再去获取得到ConnectionHolder
,经历了两步。
什么?MapperProxy
既然持有DataSource
,不为啥不直接通过DataSource
获取,而是去绕这么大一圈,我不知道哦,我很菜。
基本流程概述
TransactionInterceptor#invoke()
现在,我们调用 a.serviceA(),正片开始咯。
因为a对象实际上是一个代理对象,调用的是代理方法,首先获取到所有拦截器List,依次调用,最后调用到@Transactional的切面invoke()方法,从这里开始咯。
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor{
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取调用对象的真实类型
Class<?> targetClass = (invocation.getThis() != null ?
AopUtils.getTargetClass(invocation.getThis()) : null);
// 事务开始的地方
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
调用到父类 TransactionAspectSupport#invokeWithinTransaction()
方法。
TransactionAspectSupport#invokeWithinTransaction()
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 1. 获取 @Transactional 注解的相关参数 tas
TransactionAttributeSource tas = getTransactionAttributeSource();
// 先从缓存map获取事务属性 TransactionAttribute ,没有就创建返回,这里不是我们关注的重点,就是进行一下封装
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 2. 获取事务管理器 tm
// 获取一个 PlatformTransactionManager 类型的Bean,后面从事务管理器里获取 DataSource,再获取数据库连接
// 指定beanName:getBean(beanName,PlatformTransactionManager.class)
// 默认获取:先从缓存获取,就是上一个事务的管理器,没有再getBean(PlatformTransactionManager.class),再放入缓存
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 这个不管它,没啥用
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 3. 获取TransactionInfo,包含了tm和TransactionStatus,并且开始事务
// 看这个方法的名字就知道了,根据7大传播机制,可能创建新事务(挂起当前事务),可能加入到新事务等等
// 并且将事务信息设置到ThreadLocal得map,出现异常后,从ThreadLocal得map根据this.dataSource获取连接,进行回滚
// 这是很重要的,大半的逻辑都在这里面
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 4. 执行调用链的下一个通知,最后会调用目标方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// 5. 回滚事务,在调用业务方法抛出异常时,事务的通知方法中,会捕获这个异常。然后对事务进行回滚操作,之后将异常再次抛出。
// 如果抛出的异常,与回滚的异常定义不匹配,则提交事务。也就是说,抛出异常了,也不一定回滚。它存在一个异常类型匹配。
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// 6. 清理当前线程的事务相关信息
cleanupTransactionInfo(txInfo);
}
// 7. 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}else {
// ... 其他我们不看,不是重点
}
}
}
上面方法 invokeWithinTransaction()
的基本流程概述
-
首先,获取
@Transactional
注解配置的事务信息,封装为TransactionAttribute
-
获取事务管理器tm
-
@Transactional
可以指定从IOC
获取的事务管理器的名字,getBean(beanName,PlatformTransactionManager.class)
-
你没有指定名字
-
从缓存获取,缓存没有,
getBean(PlatformTransactionManager.class)
,再加入缓存,返回tm
-
-
-
创建事务
TransactionInfo txInfo =createTransactionIfNecessary(tm, txAttr,joinpointIdentification);
-
执行调用链的下一个通知,最后会调用目标方法
-
如果异常,调用回滚方法
completeTransactionAfterThrowing(txInfo, ex);
,里面会进行资源解除,可能归还(关闭)Connection,可能会提交,可能会回滚,可能既不提交也不回滚 -
没有异常,调用提交方法
commitTransactionAfterReturning(txInfo);
,里面会进行资源解除,可能归还(关闭)Connection,可能会提交,可能会回滚,可能既不提交也不回滚 -
清理当前线程的事务相关信息
然后这个方法结束了,疑问:
-
什么叫可能归还(关闭)连接?你知不知道,嵌套事务,加入
serviceB()
是require
创博机制,A,B方法共用一个Connection,你(B)把连接归还(关闭),那我(A)用什么?。-
使用连接池,自然归还连接。不使用连接池就关闭连接
-
-
什么叫可能会提交,可能会回滚,可能既不提交也不回滚?不是,哥们,你玩我呢?我先不说,我们继续往下面分析,最后你就懂了。
事务准备篇
TransactionAspectSupport#createTransactionIfNecessary()
这是个非常非常重要的方法,spring的传播机制就是在这里面完成的。
看这个方法的名字就知道了,根据7大传播机制,可能创建新事务(挂起当前事务),可能加入到新事务等等
下面我贴出主要代码
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr,
final String joinpointIdentification) {
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 1、获取事务信息,根据7大传播机制,可能创建新事务(挂起当前事务),可能加入到当前事务等等
// 返回一个已经激活的事务或创建一个新的事务
// 并且将事务信息设置到线程同步器,出现异常后,从ThreadLocal得map根据this.dataSource获取连接,进行回滚
status = tm.getTransaction(txAttr);
}
}
// 2、将目前得事务信息封装到TransactionInfo,将 TransactionInfo 绑定到线程
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
1、获取事务状态TransactionStatus
,在里面会根据事务传播机制,选择加入事务,还是新建事务,返回一个已经激活的事务或创建一个新的事务
2、将当前事务与当前线程绑定
AbstractPlatformTransactionManager#getTransaction()
返回一个已经激活的事务或创建一个新的事务
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// doGetTransaction()方法是抽象方法,具体的实现由委派给具体的子类事务处理器提供
// 从当前的 transactionManager 获取DataSource对象
// 然后以该DataSource对象为Key,去一个ThreadLocal变量中的map中尝试获取该DataSource的连接
// 然后设置到 DataSourceTransactionObject 中返回 DataSourceTransactionObject。
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
// 如果没有配置事务属性,则使用默认的事务属性
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
// 判断当前是否存在事务:持有连接&&连接的transactionActive=true
if (isExistingTransaction(transaction)) {
// 当前线程已经在一个事务中了,则需要根据事务的传播级别来决定如何处理并获取事务状态对象
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
/**
* 到这里,说明当前不存事务
*/
// 到这里,说明不存在事务,如果传播机制是mandatory,直接抛异常了
// mandatory:使用当前事务,如果没有当前事务,就抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 如果事务传播特性为required、required_new或nested,到这里说明当前没有事务,需要创建新事务
// required:有事务就支持当前事物,没有就自己开启一个
// required_new:如果当前无事务则开启一个事务,否则挂起当前事务并开启新事务。
// nested:创建一个嵌套事务,如果当前无事务则创建一个事务。
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);// 事务挂起,但这里没事务,且传入null,所以什么也不做,返回null
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 如果当前不在一个事务中,则进行创建事务的准备操作
// 不激活和当前线程绑定的事务,因为事务传播特性配置要求创建新的事务
// newSynchronization 在 prepareSynchronization 中会通过这个字段来决定是否把事物更新到当前线程中
// 不过在 newTransactionStatus() ,会判断当前线程是否绑定事物,如果绑定就不可以赋值覆盖
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构造一个新事务状态对象 DefaultTransactionStatus ,注意这里第三个参数为true,代表是一个新事务
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 执行begin操作,核心操作是设置隔离级别,执行 conn.setAutoCommit(false) 同时将数据连接绑定到当前线程
doBegin(transaction, definition);
// 初始化和同步事务状态,针对事务相关属性如隔离级别,是否在事务中,设置绑定到当前线程
// 出现异常后,从ThreadLocal得map根据this.dataSource获取连接,进行回滚
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
} else {
// 到这里肯定不存在事务,剩下的3个传播机制都不需要事务
// supports:支持当前事务,如果没有当前事务,就以非事务方法执行。
// not_supported:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
// never:以非事务方式执行操作,如果当前事务存在则抛出异常。
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
// 一个空的status
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
-
从上面提到的线程同步管理器获取连接,不管有没有连接,会
new
一个DataSourceTransactionObject
返回,里面持有连接ConnectionHolder
,可能为null。Object transaction = doGetTransaction();
-
我怎么知道当前有么有事务存在?看上一步获取到的
DataSourceTransactionObject
是否持有连接,也就是是否从线程同步挂不离弃获取到了连接,如果获取到了连接,那么说明存在事务,如果没有连接就不存在事务。 -
根据有没有事务分为两种情况操作,根据事务的传播级别来决定如何处理并获取事务状态对象
DefaultTransactionStatus
DataSourceTransactionManager#doGetTransaction()
从线程同步管理器获取连接
注意咯,这是我们的事务管理器tm
调用的方法,所以内部持有this.dataSource
,我们正是根据这个dataSource
获取连接
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 是否允许设置保存点
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// obtainDataSource():从当前事务管理器tm获取DataSource,返回this.dataSource
// 以this.dataSource为key,从ThreadLocal的map中获取对应的连接Connection返回
// ConnectionHolder持有连接Connection
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 将连接设置到 DataSourceTransactionObject,newConnection=false:表示这是我从当前已经存在的事务获取到的连接,并非新建的连接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
不管有没有连接,先创建一个DataSourceTransactionObject
,用作返回对象,可能持有连接
DataSourceTransactionObject
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {
// 是否是新建的连接
// true:新获取的连接,dataSource.getConnection(),表明当前存在事务
// false:从线程同步管理器获取的当前事务的连接,表明当前不存在事务
private boolean newConnectionHolder;
private boolean mustRestoreAutoCommit;
}
public abstract class JdbcTransactionObjectSupport implements SavepointManager, SmartTransactionObject {
// 持有的连接,有连接,,当前存在事务
// 没有连接,不存在事务
private ConnectionHolder connectionHolder;
private Integer previousIsolationLevel;
private boolean savepointAllowed = false;
}
我们只需要关注connectionHolder
和newConnectionHolder
即可
ConnectionHolder
public class ConnectionHolder extends ResourceHolderSupport {
// 保存点前缀
public static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";
@Nullable
private ConnectionHandle connectionHandle;
// 持有当前连接
@Nullable
private Connection currentConnection;
/**
* 当前连接 currentConnection 是否开启了事务
* <p>1、如果不存在连接,那么为false
* <p>2、如果存在连接,可能开启了事务,因为有些是在无事务的环境运行,比如 not_supported
*/
private boolean transactionActive = false;
// 是否支持保存点
@Nullable
private Boolean savepointsSupported;
// 当前连接保存点个数,当前连接每设置一个保存点,该计数器加1
// nested 传播机制需要设置保存点,保存点名字:SAVEPOINT_NAME_PREFIX+savepointCounter
private int savepointCounter = 0;
}
判断是否存在事务
上面我们已经提到,在AbstractPlatformTransactionManager#getTransaction()
方法里面的第一步就是获取当前事务的连接ConnectionHolder
,封装为DataSourceTransactionObject
根据传播机制,我们首先需要判断是否存在事务
// 当前数据库连接是否已经存在事务了,持有连接&&连接的transactionActive=true
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
说白了,存在事务必须满足:从事务同步管理器获取到了ConnectionHolder
,且属性transactionActive=true
DataSourceTransactionManager#handleExistingTransaction()
来到这个方法,说明存在事务,根据传播机制,进行相应的处理,注释已经说明清楚了。
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
/**
* 当前存在事务
*/
// never:以非事务方式执行操作,如果当前事务存在则抛出异常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// not_supported:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 将当前事务挂起,并将当前被事务的信息封装为SuspendedResourcesHolder保存,之后会进行恢复
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 挂起事务同时将当前事务设置为null,newTransaction设置为false,把线程的相关ThreadLocal变量移除的就像当前不存在事务一样
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 如果是required_NEW的话,则挂起当前事务,同时创建一个新的事务,执行doBegin操作
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 事务挂起
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建新事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 创建事务,获取连接,开启事务
doBegin(transaction, definition);
// 事务信息同步到当前线程
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// nested:嵌套的,如果当前存在事务,则在嵌套事务内执行,如果没有事务就新建一个事务。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}// 是否使用保存点
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
// 使用同一个事务
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 使用同一个连接,创建保存点
status.createAndHoldSavepoint();
return status;
} else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 这里判断是否需要对已经存在的事务进行校验,这个可以通过AbstractPlatformTransactionManager.setValidateExistingTransaction(boolean)来设置,
// 设置为true后需要校验当前事务的隔离级别和已经存在的事务的隔离级别是否一致
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 如果不设置是否校验已经存在的事务,则对于 required 传播级别会走到这里来,这里把 newTransaction 标志位设置为false,
// 这里用的definition是当前事务的相关属性,所以隔离级别等依然是当前事务的(子事务),而不是已经存在的事务的隔离级别(父事务)
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
事务挂起
DataSourceTransactionManager#suspend()
// 将当前事务挂起,并将当前被事务的信息封装为SuspendedResourcesHolder保存,之后会进行恢复
Object suspendedResources = suspend(transaction);
上面我们已经提到,在AbstractPlatformTransactionManager#getTransaction()
方法里面的第一步就是获取当前事务的连接ConnectionHolder
,封装为DataSourceTransactionObject
现在事务挂起,将当前事务得信息从事务同步管理器的几个ThreadLocal
移除。
虽然移除了,但是我们会封装为SuspendedResourcesHolder
保存起来,在当前事务执行完后,会取出SuspendedResourcesHolder
重新设置到事务同步管理器的几个ThreadLocal
里面。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
/**
* 将当前事务得信息从事务同步管理器得几个ThreadLocal移除,表示挂起
* <p>并将当前被事务的信息,封装为SuspendedResourcesHolder保存,之后会进行恢复
*/
// 当前存在活跃事务
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 在这里 SqlSessionSynchronization 操作 TransactionSynchronizationManager.resources 的map,移除 sqlSessionFactory --> SqlSessionHolder 这个键值对
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 将当前事务挂起,返回的是移除的当前事务的DataSource对应map中的连接
suspendedResources = doSuspend(transaction);
}
// 移除当前事务名
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
// 移除当前事务是否可读
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
// 移除当前事务的隔离级别
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
// 当前事务取消活跃状态
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 将当前被挂起事务的信息,封装为SuspendedResourcesHolder,之后会进行恢复
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
// Neither transaction nor synchronization active.
return null;
}
}
doSuspend()
// 将当前事务的连接从线程上下文移除
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
// 移除当前事务的DataSource的连接并返回该连接
// 所谓的挂起,就是解绑resource,它之前存储在一个线程绑定的名为resources得ThreadLocal的map中。
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
恢复事务,怎么恢复?当然是怎么挂起的就怎么恢复啦,取出来重新设置到线程同步管理器的6个ThreadLocal
里面
DefaultTransactionStatus
看到这里相比你已经很迷惑了,什么呀,别急,先来说说DefaultTransactionStatus
你看,通过继承的方法,DefaultTransactionStatus
拥有了9个属性,这包含了当前事务的所有信息,而后面我们commit()或者是rollback()都是将一个事务的DefaultTransactionStatus
作为参数传进行。
public class DefaultTransactionStatus extends AbstractTransactionStatus {
// DataSourceTransactionObject 类型,持有ConnectionHolder,当前事务的连接信息
@Nullable
private final Object transaction;
// 是否是新建的事务
private final boolean newTransaction;
// newSynchronization:在 prepareSynchronization 中会通过这个字段来决定是否把事物更新到当前线程中
private final boolean newSynchronization;
// 是否只读
private final boolean readOnly;
private final boolean debug;
// SuspendedResourcesHolder ,上一个被挂起的事务信息,当前事务结束后需要恢复之前的事务
@Nullable
private final Object suspendedResources;
}
public abstract class AbstractTransactionStatus implements TransactionStatus {
// 回滚标志,如果为true,那么即便调用了commit()也不会提交,而是回滚
private boolean rollbackOnly = false;
// 事务是否完成
private boolean completed = false;
// 保存点,如果要回滚的话,回滚到这个保存点即可,仅限 nested 传播机制
@Nullable
private Object savepoint;
}
SuspendedResourcesHolder
封装上一个事务的信息
protected static final class SuspendedResourcesHolder {
// 上一个被挂起事务的 DataSourceTransactionObject ,持有ConnectionHolder
private final Object suspendedResources;
// 上一个被挂起事务的 事务同步器
private List<TransactionSynchronization> suspendedSynchronizations;
// 事务名
private String name;
private boolean readOnly;
private Integer isolationLevel;
// 是否活跃状态
private boolean wasActive;
}
新建事务状态
不管你有没有事务,是否加入旧事务,是否用同一个连接,是否新建事务,都必须新建一个事务状态
在 required_new 传播机制里面
// 如果是 required_new 的话,则挂起当前事务,同时创建一个新的事务,执行doBegin操作
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
// 1、事务挂起,与线程同步管理器解除资源绑定,封装为SuspendedResourcesHolder返回
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 2、创建新事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 3、创建事务,获取连接,开启事务
doBegin(transaction, definition);
// 4、事务信息同步到当前线程
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
我们前面已经分析了事务挂起,将当前存在的事务,与线程同步管理器解除资源绑定,封装为SuspendedResourcesHolder
返回。
现在,我们来分析第二步,创建新事务状态DefaultTransactionStatus
。
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
里面也没什么逻辑,就是将@Transactional
对应解析的事务信息封装到一个new DefaultTransactionStatus()
里面,还有当前被挂起事务的信息SuspendedResourcesHolder
,后面本次新事务结束后会取出来进行恢复。
创建事务
来,上一步中,我们已经创建了本次事务的状态DefaultTransactionStatus
,但是还没有与当前线程资源绑定,还没有获取连接,现在我们来看看
doBegin(transaction, definition);
新建事务(新建连接)入口
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//如果当前事务管理器不持有连接(比如新建的)||持有连接但被锁定(属性 synchronizedWithTransaction==true),就需要创建连接了
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 创建新连接,可能从数据库连接池获取,
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 锁定当前连接,synchronizedWithTransaction=true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 为当前事务获取并设置指定的隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 如果当前连接是自动提交的状态,那么取消自动提交,开启事务肯定要手动提交,不然异常了怎么回滚嘛
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 如果事务只读,那么先操作数据库设置只读, stmt.executeUpdate("SET TRANSACTION READ ONLY");
prepareTransactionalConnection(con, definition);
// 设置transactionActive=true,表示当前连接已经开启事务了,判断是否存在事务也根据这个判断
txObject.getConnectionHolder().setTransactionActive(true);
// 设置超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 如果这是刚才新建的连接
if (txObject.isNewConnectionHolder()) {
// 那么获取当前tm的this.dataSource作为key,以当前连接作为value
// 存放到TransactionSynchronizationManager.resources 这个ThreadLocal<Map<Object,Object>> 的map里面
// 最开始的时候,我们也根据 this.dataSource作为key 来尝试获取连接了的
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
在这里
// 创建新连接,可能从数据库连接池获取,
Connection newCon = obtainDataSource().getConnection();
我们进行了连接的新建(或者从连接池获取)。
// 如果这是刚才新建的连接
if (txObject.isNewConnectionHolder()) {
// 那么获取当前tm的this.dataSource作为key,以当前连接作为value
// 存放到TransactionSynchronizationManager.resources 这个ThreadLocal<Map<Object,Object>> 的map里面
// 最开始的时候,我们也根据 this.dataSource作为key 来尝试获取连接了的
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
下面的我们就不看了,都是绑定线程资源,已经注释了。
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
// 事务是否开启
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 事务隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
// 事务是否只读
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 事务名字
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
// 事务同步器
TransactionSynchronizationManager.initSynchronization();
}
}
到此为止,我们创建事务篇结束。什么?我其他 nested 传播机制没讲?你先别急,即便我不讲,你到这里了也多多少少应该看得懂了。多d几遍bug就明白了。
Spring-Mybatis获取连接
来吧,本次重点不是这个,所以我简单介绍介绍Mybatis如何获取连接。
@Transactional
public void test(User user, Long id, String newName) {
int res = userMapper.insertUser(user);
}
来到userMapper.insertUser(user)
的时候,事务已经准备好了,上面就是在讲这个,现在我们开始获取连接。
上面已经提到了,我现在cv下来复习一遍
-
首先我们知道,
XxxMapper
对象是一个代理对象MapperProxy
,获取连接是通过TransactionSynchronizationManager.resourse,get()
这个map
里面获取的,map.get(sqlSessionFactory)
返回sessionHolder
对象,顾名思义,持有session
。 -
最开始,map里面没有,自然获取不到
map.get(sqlSessionFactory)==null
,通过openSession()
构建一个session
,里面持有通过Configuration
获取的DataSource
,稳了 -
现在
session
有了,sqlSessionFactory
也有了,map.put(sqlSessionFactory,session)
直接设置进去就行了 -
注意咯:
session
里面持有Executor
,Executor
持有Transaction
,Transaction
(这里是SpringManagedTransaction
类型)持有该DataSource
与Connection
,最开始创建的时候,没有Connection
,通过getConnection(dataSource)
获取,从事务同步管理器的resources
的map里面获取连接,没有连接就新建立一个连接,再设置到resources
。 -
现在你知道怎么获取连接了吧了吧。
MapperProxy#invoke()
梦开始的地方
public class MapperProxy<T> implements InvocationHandler, Serializable {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
} else {
// 我们mapper接口定义的方法调用入口
return cachedInvoker(method).invoke(proxy, method, args, sqlSession);
}
}
}
PlainMethodInvoker#invoke()
private static class PlainMethodInvoker implements MapperMethodInvoker {
@Override
public Object invoke(Object proxy, Method method, Object[] args, SqlSession sqlSession) throws Throwable {
// 这里这里
return mapperMethod.execute(sqlSession, args);
}
}
MapperMethod#execute()
public class MapperMethod {
public Object execute(SqlSession sqlSession, Object[] args) {
Object result;
switch (command.getType()) {
case INSERT: {
Object param = method.convertArgsToSqlCommandParam(args);
// 这里的 sqlSession.insert() 进去
result = rowCountResult(sqlSession.insert(command.getName(), param));
break;
}
// ...其他,这里不管
}
return result;
}
}
SqlSessionTemplate#insert()
public class SqlSessionTemplate implements SqlSession, DisposableBean {
private final SqlSession sqlSessionProxy;
@Override
public int insert(String statement, Object parameter) {
// 这里sqlSessionProxy 是个代理对象,实际类型为 SqlSessionInterceptor
return this.sqlSessionProxy.insert(statement, parameter);
}
}
SqlSessionInterceptor#invoke()
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 在这里,从线程同步管理器获取 SqlSessionHolder
SqlSession sqlSession = getSqlSession(
SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType,
SqlSessionTemplate.this.exceptionTranslator);
try {
// 执行sql,在这里面 从线程同步管理器获取连接
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
// ... 其他,提多了,我就不贴了,也不是重点
}
throw unwrapped;
} finally {
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
SqlSessionUtils#getSqlSession()
public final class SqlSessionUtils {
// 根据sessionFactory 从线程同步管理器获取 SqlSessionHolder
SqlSessionHolder holder = (SqlSessionHolder)
TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
// 获取不到,创建一个 DefaultSqlSession 设置到线程同步管理器里面
session = sessionFactory.openSession(executorType);
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
SqlSessionUtils#registerSessionHolder()
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
// 当前事务存在活跃事务
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 获取Mybatis的上下文Configuration里面的数据源DataSource,Environment持有DataSource
// 看到这里,你是否有疑问,我们的连接不是从IOC获取的,而是从Configuration获取的
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Registering transaction synchronization for SqlSession [" + session + "]");
}
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
// 当我们new了一个SqlSessionHolder后,就要与当前线程绑定,这个方法想相信你之前也看烂了吧
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
// 欸嘿,这里注册了一个 SqlSessionSynchronization 的间接父类 TransactionSynchronization
TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
// ...
}
}
我们将 SqlSessionSynchronization
注册到 synchronizations
里面了什么时候使用的?别慌,后面分析,在事务快结束的时候调用。
// 事务同步器,事务运行时的扩展代码,每个线程可以注册N个事务同步器。比如 SqlSessionSynchronization
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
DefaultSqlSession#update()
@Override
public int insert(String statement, Object parameter) {
// 实际调用 update()
return update(statement, parameter);
}
@Override
public int update(String statement, Object parameter) {
MappedStatement ms = configuration.getMappedStatement(statement);
// 来吧,在这里面获取连接
return executor.update(ms, wrapCollection(parameter));
}
SimpleExecutor#doUpdate()
中间的我省略了,直接来到这里
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
// 这里获取连接
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
} finally {
closeStatement(stmt);
}
}
SimpleExecutor#prepareStatement()
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
// 根据datSource从事务同步管理器获取连接,没错,就是我们之前设置的那个连接
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
BaseExecutor#getConnection()
protected Connection getConnection(Log statementLog) throws SQLException {
// 获取连接,因为整和了spring-mybatis,所以创建的是 SpringManagedTransaction ,自然使用它的方法getConnection()
Connection connection = transaction.getConnection();
if (statementLog.isDebugEnabled()) {
return ConnectionLogger.newInstance(connection, statementLog, queryStack);
} else {
return connection;
}
}
SpringManagedTransaction# getConnection()
获取连接
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
private void openConnection() throws SQLException {
// 获取连接
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
// 打印更换了连接,打印日志,这个不管,不重要
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"JDBC Connection ["
+ this.connection
+ "] will"
+ (this.isConnectionTransactional ? " " : " not ")
+ "be managed by Spring");
}
}
DataSourceUtils #doGetConnection()
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
// 根据datSource从事务同步管理器获取连接,没错,就是我们之前设置的那个连接
ConnectionHolder conHolder = (ConnectionHolder)
TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();// 计数,不管
if (!conHolder.hasConnection()) {
// fetchConnection():通过dataSource.getConnection()新建连接
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}else {
// ... 其他的不看
}
现在,Mybatis
获取连接篇结束了,想必一路看下来你已经知道了spring-mybatis整合的情况下是怎么获取连接了吧。
事务结束篇
回忆一下,我们上一篇讲述了在执行目标方法后Mybatis如何获取连接,那么方法执行完毕了,该是提交了吧,我们先说回滚吧。
// 3. 获取TransactionInfo,包含了tm和TransactionStatus,并且开始事务
// 看这个方法的名字就知道了,根据7大传播机制,可能创建新事务(挂起当前事务),可能加入到新事务等等
// 并且将事务信息设置到ThreadLocal得map,出现异常后,从ThreadLocal得map根据this.dataSource获取连接,进行回滚
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 4. 执行调用链的下一个通知,最后会调用目标方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// 5. 回滚事务,在调用业务方法抛出异常时,事务的通知方法中,会捕获这个异常。然后对事务进行回滚操作,之后将异常再次抛出。
// 如果抛出的异常,与回滚的异常定义不匹配,则提交事务。也就是说,抛出异常了,也不一定回滚。它存在一个异常类型匹配。
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// 6. 清理当前线程的事务相关信息
cleanupTransactionInfo(txInfo);
}
// 7. 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
completeTransactionAfterThrowing()
异常回滚
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 如果存在事务状态对象,进行回滚操作,如果没事务,你拿什么回滚
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 判断事务属性不为空并且满足回滚规则,就进行回滚,否则进行事务提交
// rollbackOn() 查看异常是否满足@Transactional指定的捕获异常
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (... ex2) {
throw ex2;
}
} else {
try {
// 如果抛出的异常,与回滚的异常定义不匹配,则提交事务。也就是说,抛出异常了,也不一定回滚。它存在一个异常类型匹配。
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (... ex2) {
throw ex2;
}
}
}
}
从这里我们发现
-
如果异常满足
@Transactional
指定的捕获异常,那么就可以进行回滚 -
如果不满足,那么提交
是的,你没看错,不满足异常要求就提交事务,所以你要小心了呀。
rollback()
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 看到我们传入的什么了吗,DefaultTransactionStatus,对指定事务回滚
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
processRollback()
数据库回滚
-
当前事务是有保存点的,回滚到保存点
-
当前事务是起点(新建的事务),直接
rollback()
-
剩下的情况是加入到原有事务的(不含保存点),只是设置
connectionHolder.rollbackOnly=true
,即便捕获了异常,那么也会在commit()
判断,connectionHolder.rollbackOnly=true
,就会回滚
最后还有统一的自定义操作,比如可能归还连接,接触绑定等:cleanupAfterCompletion()
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 自定义回滚前操作,
triggerBeforeCompletion(status);
// 如果是保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 回滚到保存点位置
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
// 如果是一个新建的事务,即回到了事务的起点。
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 取出连接进行回滚
doRollback(status);
} else {
// 如果是加入的已有事务,则将事务状态设置为仅回滚 rollbackOnly=true
// 最终返回到事务的起点时,是否向外抛出异常都会回滚。
// 所以在 required 传播行为中,任何一个加入事务的方法异常,都会触发回滚。
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
// 设置仅回滚,设置该事务的 ConnectionHolder.rollbackOnly=true
// 所以即便捕获了异常,只要外层事务持有的是该ConnectionHolder都会回滚
doSetRollbackOnly(status);
}
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
// 如果是新建的事务则释放连接,如果是新建事务,将上一个被挂起事务恢复
cleanupAfterCompletion(status);
}
}
cleanupAfterCompletion()
如果是新建的事务则释放连接,如果是新建事务,将上一个被挂起事务恢复
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
// 归还连接到连接池,没有连接池就关闭连接
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 恢复上一个被挂起的事务,重新绑定到当前线程
// 包括DataSource,sqlSession(重新绑定,需要注册到线程上下文的set中,如SqlSessionSynchronization)...
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
resume()
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
// 恢复上一个被挂起的事务,重新绑定到当前线程
// 包括DataSource,sqlSession(重新绑定,需要注册到线程上下文的set中,如SqlSessionSynchronization)...
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
// 恢复上一个被挂起的事务,重新绑定到当前线程
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
// 自定义事务恢复操作,需要实现 TransactionSynchronization 并注册到线程上下文
// 比如上面Mybatis注册的 SqlSessionSynchronization
doResumeSynchronization(suspendedSynchronizations);
}
}
}
SqlSessionSynchronization
这是在Mybatis
获取SqlSessionHolder
失败后,向事务同步管理器注册的
private static final class SqlSessionSynchronization extends TransactionSynchronizationAdapter {
private final SqlSessionHolder holder;
private final SqlSessionFactory sessionFactory;
private boolean holderActive = true;
public SqlSessionSynchronization(SqlSessionHolder holder, SqlSessionFactory sessionFactory) {
notNull(holder, "Parameter 'holder' must be not null");
notNull(sessionFactory, "Parameter 'sessionFactory' must be not null");
this.holder = holder;
this.sessionFactory = sessionFactory;
}
// 事务被挂起的时候,SqlSessionHolder 也要与当前线程解除绑定
@Override
public void suspend() {
if (this.holderActive) {
TransactionSynchronizationManager.unbindResource(this.sessionFactory);
}
}
// 被挂起事务恢复的时候,SqlSessionHolder 也要与当前线程重新绑定,一并恢复
@Override
public void resume() {
if (this.holderActive) {
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
}
}
// 事务提交前操作,对应sqlSession也要提交,二级缓存就是在这里被正常存进去的
@Override
public void beforeCommit(boolean readOnly) {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
this.holder.getSqlSession().commit();
}
}
// 事务完成前操作,解除绑定,关闭sqlSession
@Override
public void beforeCompletion() {
if (!this.holder.isOpen()) {
TransactionSynchronizationManager.unbindResource(sessionFactory);
this.holderActive = false;
this.holder.getSqlSession().close();
}
}
// 事务完成后操作
@Override
public void afterCompletion(int status) {
if (this.holderActive) {
TransactionSynchronizationManager.unbindResourceIfPossible(sessionFactory);
this.holderActive = false;
this.holder.getSqlSession().close();
}
this.holder.reset();
}
}
commitTransactionAfterReturning()
正常提交
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
// 获取事务管理器进行提交操作
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
commit()
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
/**
* 比如 required 传播行为,前一个方法已经开启了事务,然后当前方法就会加入到这个已存在的事务中。
* 如果这个内层的方法抛出异常,即使在外层方法中捕获异常,同样会回滚事务。
* 这个回滚操作并不是在内层方法中做的,在内层方法结束的时候,只会做一个 rollbackOnly 的标记。
* 真正的回滚是在创建事务的起点方法做的。
* 所以下面代码中,会先判断是否仅回滚,如果是,则直接执行回滚操作。
* isGlobalRollbackOnly() 的判断于此类似。如果最终判断应该提交事务,则执行 processCommit() 进行事务的提交。
*/
// 如果事务信息中保存的是仅回滚,则进行回滚操作。
// 比如假如到这个事务中的方法抛出异常,则不会再那个方法就回滚,而是返回到事务的起点方法后,再进行回滚操作。
if (defStatus.isLocalRollbackOnly()) {
// 回滚事务
processRollback(defStatus, false);
return;
}
// 判断是否仅回滚,主要是看持有的 connectionHolder 是否设置了仅回滚,在事务回滚的时候,会设置connectionHolder.rollbackOnly=true
// 所以,如果内部事务与当前事务持有同一个connectionHolder,比如 require 加入当前事务,只要内部事务异常那么就必然回滚了
// 但是 nested 内部异常,但是捕获了不会影响外部事务,因为 processRollback() 里面不会 对有保存点的回滚不会设置rollbackOnly=true
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
// 回滚事务
processRollback(defStatus, true);
return;
}
// 提交事务
processCommit(defStatus);
}
processCommit()
事务提交
-
含有保存点,仅仅抹除保存点
-
是最外层事务(起点事务,新建的事务,newTransaction=true),那么真正提交事务
-
其他,内层事务(如 require 加入到当前的事务),不做什么处理
最后还有统一的自定义操作,比如可能归还连接,接触绑定等
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 预提交,这是一个钩子方法,点进去可以看到是一个空方法。
prepareForCommit(status);
// 事务提交前自定义操作,需要实现 TransactionSynchronization 接口,并注册当前线程上下文
triggerBeforeCommit(status);
// 事务完成前自定义操作,需要实现 TransactionSynchronization 接口,并注册当前线程上下文
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 是否有回滚点(保存点),如果是设置了回滚点,则仅仅把回滚点抹除即可。真正的提交是在最外层事务提交做的。
// 这也是 nested 传播行为,在外围方法事务异常时,所有嵌套事务全部回滚的原因所在。
if (status.hasSavepoint()) {
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// 如果是事务创建的起点,提交
unexpectedRollback = status.isGlobalRollbackOnly();
// 再此提交提交事务,向我看齐
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// 如果有回滚标记,比如内部 require 加入的事务异常就会设置该标记 connectionHolder.rollbackOnly=true
// 但基本上不会来到这个方法,因为按照正常逻辑,在commit()方法就判断了
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (... ex) {
throw ex;
}
try {
// 事务提交后自定义操作,需要实现 TransactionSynchronization 接口,并注册当前线程上下文
triggerAfterCommit(status);
} finally {
// 事务完成后自定义操作,需要实现 TransactionSynchronization 接口,并注册当前线程上下文
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
// 如果是新建的事务,那么再此:归还连接,恢复上一个被挂起的事务,重新绑定到当前线程
// 包括DataSource,sqlSession(重新绑定,需要注册到线程上下文的set中,如SqlSessionSynchronization)...
cleanupAfterCompletion(status);
}
}
总结
-
将事务信息设置到 事务同步管理器,也就是线程上下文,并封装为
DefaultTransactionStatus
,后面Mybatis
根据DataSource
去事务同步管理器经过两次操作获取线程上下文中对应的ConnectionHolder
。 -
内层事务不会提交或者回滚,只会设置标记,最终提交/回滚交给最外层事务决定。
注意:我说的外层事务内层事务是用的同一个连接的那种,比如 require
,像 require_new
这种,内外事务时单独的两个事务,不会相互影响,各自单独提交回滚。
到此结束
番外
SpringBoot-Mybatis
我们导入依赖
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.3</version>
</dependency>
我们发现
导入了两个自动配置类,我们现在只关注MybatisAutoConfiguration
MybatisAutoConfiguration
@Configuration
// SqlSessionFactoryBean 也是个很重要的类,后面会用到
@ConditionalOnClass({SqlSessionFactory.class, SqlSessionFactoryBean.class})
@ConditionalOnSingleCandidate(DataSource.class)
// MybatisProperties 就是我们yml文件配置的 prefix = "mybatis" 的配置的封装
@EnableConfigurationProperties({MybatisProperties.class})
// 在 DataSourceAutoConfiguration 之后自动配置,为啥?,首先我们要知道 DataSourceAutoConfiguration 是根据yml配置文件注册 DataSource的,后面会向容器注入一个 DataSource 单例
@AutoConfigureAfter({DataSourceAutoConfiguration.class, MybatisLanguageDriverAutoConfiguration.class})
public class MybatisAutoConfiguration implements InitializingBean {
// ... 几个属性,还有一个有参构造函数,其中一个参数为 MybatisProperties ,包含有yml配置文件的 prefix=mybatis 的信息
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
// 看到这里方法的 dataSource 参数里吗,spring自动配置可知,容器里面必须存在 dataSource
// 而dataSource 由 DataSourceAutoConfiguration 装配
// 所以才需要在 DataSourceAutoConfiguration 之后进行装配
// 当然,我们也可以禁用掉 DataSourceAutoConfiguration,然后自己手动注册 dataSource 到容器
factory.setDataSource(dataSource);
// ... 一堆东西,我们不关心,就是给 SqlSessionFactoryBean 设置配置文件的信息
// 创建 SqlSessionFactory 注入到容器
return factory.getObject();
}
}
MybatisProperties
获取yml
文件配置的 prefix = "mybatis"
的配置信息,封装为MybatisProperties
@ConfigurationProperties(
prefix = "mybatis"
)
public class MybatisProperties {
public static final String MYBATIS_PREFIX = "mybatis";
private static final ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
private String configLocation;
private String[] mapperLocations;
private String typeAliasesPackage;
private Class<?> typeAliasesSuperType;
private String typeHandlersPackage;
private boolean checkConfigLocation = false;
private ExecutorType executorType;
private Class<? extends LanguageDriver> defaultScriptingLanguageDriver;
private Properties configurationProperties;
@NestedConfigurationProperty
private Configuration configuration;
}
SqlSessionFactoryBean
public class SqlSessionFactoryBean implements FactoryBean<SqlSessionFactory>, InitializingBean, ApplicationListener<ApplicationEvent> {
// 上面 最后调用getObject()方法创建 SqlSessionFactory
public SqlSessionFactory getObject() throws Exception {
if (this.sqlSessionFactory == null) {
this.afterPropertiesSet();
}
return this.sqlSessionFactory;
}
public void afterPropertiesSet() throws Exception {
// 检查必要参数 不能为空
Assert.notNull(this.dataSource, "Property 'dataSource' is required");
Assert.notNull(this.sqlSessionFactoryBuilder, "Property 'sqlSessionFactoryBuilder' is required");
Assert.state(this.configuration == null && this.configLocation == null || this.configuration == null || this.configLocation == null, "Property 'configuration' and 'configLocation' can not specified with together");
// 熟悉的 buildSqlSessionFactory()
this.sqlSessionFactory = this.buildSqlSessionFactory();
}
}
之后的就是Mybatis的部分了。
注意了,执行XxxMapper
的方法时,获取DataSource
是从configuration
里面获取的,不是从容器获取的。
所以,当我们创建完后SqlSessionFactory
之后,向IOC
容器注入另外一个DataSource
,并使用@Primary
优先获取,那么可能会导致事务失效,为啥?事务开启前从事务管理器获取的,如果我们修改了这个事务管理器持有的DataSource
,促使SqlSessionFactory
里面的DataSource
与事务管理器持有的DataSource
不同,那么导致设置事务的Connection
都不是同一个。