您现在的位置是:首页 >其他 >spring源码篇(八)事务的原理网站首页其他
spring源码篇(八)事务的原理
文章目录
前言
不流于表面,深入理解AOP与事务逻辑的配合完成事务,文章内容有些长,拆了可能影响前后逻辑连贯型,
基本操作
数据库执行事务步骤是这样的:
- 开启事务 begin sql commit autocommit = on
- sql
- commit
- rollback
mysql中一个事务的由begin
开始,commit
结束,在事务中的错误都会回退,而一般情况下都是开启了autocommit=on
,使得每个sql都会commit;
验证
- 一个方法调用两个insert,两个insert的应是单独的连接对象,或者同一个对象,但connection中的autocommit=true
public Object sw() {
Demo demo = new Demo();
demo.setDbName("sw-ces1");
demoMapper.insert(demo);
demo = new Demo();
demo.setDbName("sw-ces2");
demoMapper.insert(demo);
return null;
}
这个方法调用了两次insert,生成了两个连接实例对象connection
,并且auto commit=true如下图:
方法定位:org.mybatis.spring.SqlSessionTemplate#insert(java.lang.String, java.lang.Object)
- 在Spring事务中,应是同一个connection对象,保证一些列的数据库操作在同一个事务中完成。
那么,打上注解:@Transactional
@Transactional
public Object sw() {
Demo demo = new Demo();
demo.setDbName("sw-ces1");
demoMapper.insert(demo);
demo = new Demo();
demo.setDbName("sw-ces2");
demoMapper.insert(demo);
return null;
}
再次查看connection对象,会发现是两个insert的connection对象都是同一个,并且autocommit=false
org.mybatis.spring.SqlSessionTemplate#insert(java.lang.String, java.lang.Object)
那么到这里,spring的事务基本轮廓已经清晰了,通过spring对connection的管理以达到事务的控制。
Spring事务的传播机制
Spring事务的管理,确实是通过connect的管理来实现的,不过这也只是我们片面的理解,Spring事务的能做到的比我们想象中要多一些;下面是spring官方的特性描述:数据访问 (spring.io)
在Spring中,它以枚举定义了7种类型:
-
REQUIRED(默认的):保证有事务注解的的方法及其方法内部调用的方法所使用的connection为同一个,并且在同一个事务中。
-
REQUIRES_NEW:当存在这个类型时,spring会重新创建一个连接并开启新事务。
-
NESTED:嵌套式的传播机制,如果有外层事务,则会嵌套一个事务,但是该事务也在外层事务的作用范围内。
-
SUPPORTS:如果外层没有事务,就不加事务,但是内部事务生效
-
MANDATORY:使用当前事务,如果当前事务不存在,则抛出Exception
-
NOT_SUPPORTED:方法调用链期间,当出现
NOT_SUPPORTED
时,事务打断不生效,在接下去的调用链中,出现REQUIRED
时,又会重新生效。 -
NEVER:非事务方式,如果方法内部有开启事务,会抛异常。
它的使用方式简单的就是在注解@Transactional
里标准类型就可以了
@Transactional(rollbackFor = Exception.class,propagation = Propagation.NESTED)
特殊的机制说明
这里通过使用的效果进行说明,后面看源码时,容易理解。
NOT_SUPPORTED
方法调用链期间,当出现NOT_SUPPORTED
时,事务打断不生效,在接下去的调用链中,出现REQUIRED
时,又会重新生效,
/**
* 外层 事务 生效
*/
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void out(){
Demo demo = new Demo();
demo.setName("NOT_SUPPORTED外层事务");
demoMapper.insert(demo);
// 调用标记NOT_SUPPORTED了的方法
applicationContext.getBean(DemoService.class).in();
}
// 中间层 NOT_SUPPORTED 没有事务
@Transactional(rollbackFor = Exception.class,propagation = Propagation.NOT_SUPPORTED)
public void in() {
Demo demo = new Demo();
demo.setName("NOT_SUPPORTED内层事务");
demoMapper.insert(demo);
// 调用开启事务的方法
// applicationContext.getBean(DemoService.class).in2();
applicationContext.getBean(DemoService.class).in22();
}
// 下层 事务 生效
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void in2() {
Demo demo = new Demo();
demo.setName("NOT_SUPPORTED内层 内层 事务");
demoMapper.insert(demo);
int i = 1/0;
}
// 下层 事务 不生效
public void in22() {
Demo demo = new Demo();
demo.setName("NOT_SUPPORTED内层 内层 事务2");
demoMapper.insert(demo);
int i = 1/0;
}
结果如下两个:
NESTED
嵌套式的传播机制,如果有外层事务,则会嵌套一个事务,但是该事务也在外层事务的作用范围内。
NESTED
可以独立回滚事务NESTED
事务方法的异常被外层吃掉,外层事务和NESTED
就相互独立,如游戏存档- 如果外层异常,内部的
NESTED
事务也会回滚
/**
外层 事务
*/
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void out2() {
Demo demo = new Demo();
demo.setName("NESTED外层事务");
demoMapper.insert(demo);
try {
applicationContext.getBean(DemoService.class).in3();
} catch (Exception e) {
System.out.println("NESTED方法异常");
}
// 如果,这里报异常,内部的`NESTED`事务也会回滚
// int i = 1/0;
}
/**
中间层 NESTED 事务
* 1. NESTED 可以独立回滚;如果出现异常,那么这个方法内的所有数据回滚
* 2. 如果这里是REQUIRED,那么包括外层都会回滚
*/
@Transactional(rollbackFor = Exception.class,propagation = Propagation.NESTED)
public void in3() {
Demo demo = new Demo();
demo.setName("NESTED内层事务");
demoMapper.insert(demo);
applicationContext.getBean(DemoService.class).in4();
}
// 下层 事务
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void in4() {
Demo demo = new Demo();
demo.setName("NESTED内层 内层 事务");
demoMapper.insert(demo);
int i = 1/0;
}
SUPPORTS
SUPPORT
如果外层没有事务,则就不加事务,但是内部事务生效,如下,
// 外层 没事务
public void out3() {
Demo demo = new Demo();
demo.setName("SUPPORTS外层事务");
demoMapper.insert(demo);
applicationContext.getBean(DemoService.class).in6();
}
// 中间层 SUPPORTS没事务
@Transactional(rollbackFor = Exception.class,propagation = Propagation.SUPPORTS)
public void in6() {
Demo demo = new Demo();
demo.setName("SUPPORTS内层事务");
demoMapper.insert(demo);
applicationContext.getBean(DemoService.class).in4();
}
// 下层 开启事务,并且生效
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRED)
public void in4() {
Demo demo = new Demo();
demo.setName("NESTED内层 内层 事务");
demoMapper.insert(demo);
int i = 1/0;
}
源码
加载事务自动配置类
spring中的很多组件都是通过自动配置类来加载的,如果是spring自己的,那么就在spring-boot-autoconfigure
下的spring.factories
中,我们可以在里面找到下面这个配置类:
org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration
要不要加注解:@EnableTransactionManagement
其实可以不用的,在@SpringBootApplication
里的@EnableAutoConfiguration
会去取spring.factories
,里面有TransactionAutoConfiguration
类,我们看一下它的其中一部分:
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnBean({TransactionManager.class})
@ConditionalOnMissingBean({AbstractTransactionManagementConfiguration.class})
public static class EnableTransactionManagementConfiguration {
public EnableTransactionManagementConfiguration() {
}
@Configuration(
proxyBeanMethods = false
)
// 启动事务注解
@EnableTransactionManagement(
proxyTargetClass = true
)
@ConditionalOnProperty(
prefix = "spring.aop",
name = {"proxy-target-class"},
havingValue = "true",
matchIfMissing = true
)
public static class CglibAutoProxyConfiguration {
public CglibAutoProxyConfiguration() {
}
}
@Configuration(
proxyBeanMethods = false
)
@EnableTransactionManagement(
proxyTargetClass = false
)
@ConditionalOnProperty(
prefix = "spring.aop",
name = {"proxy-target-class"},
havingValue = "false",
matchIfMissing = false
)
public static class JdkDynamicAutoProxyConfiguration {
public JdkDynamicAutoProxyConfiguration() {
}
}
}
如上,它会注入一个EnableTransactionManagementConfiguration
配置类,然后扫描到注解@EnableTransactionManagement
,所以,可以不用特意加上注解。
配置类说明
TransactionAutoConfiguration
配置类头上有加如下的注解:
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({PlatformTransactionManager.class})
@AutoConfigureAfter({JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class, DataSourceTransactionManagerAutoConfiguration.class, Neo4jDataAutoConfiguration.class})
@EnableConfigurationProperties({TransactionProperties.class})
@Configuration
:被spring识别为配置类的标识,这里大家思考一下,被spring识别为配置类的的标识有哪些注解?
@ConditionalOnClass({PlatformTransactionManager.class})
:这个是当存在PlatformTransactionManager
类时才进行配置类加载
@AutoConfigureAfter({JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class, DataSourceTransactionManagerAutoConfiguration.class, Neo4jDataAutoConfiguration.class})
:
这个是在spring加载这些配置类后才进行该配置类的加载,那么,如果你引入spring-data-jdbc的话,就会存在DataSourceTransactionManagerAutoConfiguration
这个配置类,同时也看出,它支持:jta,jpa,jdbc,neo4j。
@EnableConfigurationProperties({TransactionProperties.class})
:事务配置参数,可以定义spring.transaction.defaultTimeout/rollbackOnCommitFailure
属性
然后重点就是EnableTransactionManagement
;
EnableTransactionManagement 做了什么
这个注解两个重要的属性:
proxyTargetClass:默认值:false;true:表示可以使用CGLIB代理,如果有接口则还是用jdk动态代理,false:使用JDK动态代理
mode: 默认值:PROXY
;值为PROXY
时表示用JDK动态代理,ASPECTJ
表示使用Spring的@Aspect
进行切面代理;要注意的是,使用PROXY
对同一个类内部的本地调用无法拦截,如果要实现高级的拦截,要使用ASPECTJ
AutoProxyRegistrar做了什么
AutoProxyRegistrar
的作用是注册了beanName为internalAutoProxyCreator
,类为InfrastructureAdvisorAutoProxyCreator
的beanDefinition,该类继承AbstractAdvisorAutoProxyCreator
。
AbstractAdvisorAutoProxyCreator:这个类在AOP章节提过,他是AOP代理自动创建类,主要做下面几个操作:
- 首先在初始化bean后,会调用它的
postProcessAfterInitialization
方法进行代理类处理 - 判断是该bean是否需要被代理
- 创建代理对象(这里创建的代理会根据对象进行判断)
AOP代理和事务代理,他们都实现了同一个AbstractAdvisorAutoProxyCreator
,然后进行了自定义的实现,用于适配不同场景的功能。
接下来,看一下AutoProxyRegistrar
实现,因为它实现ImportBeanDefinitionRegistrar
,所以它这里做了注册bean的逻辑操作。
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
// 获取配置类CglibAutoProxyConfiguration上的注解
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
for (String annType : annTypes) {
// 获取注解上的属性
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (candidate == null) {
continue;
}
// 这个属性应该只有这里用到,用于判断是否创建代理
Object mode = candidate.get("mode");
// 这个属性它对应的是`proxyFactory`里的proxyTargetClass的,表示可以使用cglib代理
Object proxyTargetClass = candidate.get("proxyTargetClass");
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
candidateFound = true;
if (mode == AdviceMode.PROXY) {
// 如果不存在 InfrastructureAdvisorAutoProxyCreator 则注入一个:
// beanName=internalAutoProxyCreator,
// class=InfrastructureAdvisorAutoProxyCreator
// 的beanDefinition
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {
// 强制使用代理类设置,这里是把`proxyTargetClass`设置到InfrastructureAdvisorAutoProxyCreator
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
if (!candidateFound && logger.isInfoEnabled()) {
String name = getClass().getSimpleName();
// ......
}
}
看一下这里:
它这里设置了一个属性proxyTargetClass
到InfrastructureAdvisorAutoProxyCreator
里,internalAutoProxyCreator
是 InfrastructureAdvisorAutoProxyCreator
的beanName,忘记了可以再看下上面代码的注释;
这里.add("proxyTargetClass", Boolean.TRUE);
,在这个类中,并没有proxyTargetClass
属性,所以这里用了add,这里关系到下面的逻辑,所以这里提一下,其实它并没有实际上的意义。
创建的代理类是jdk动态代理还是cglib
结果:只会是CGLIB代理,不是JDK代理,理由看下面。
注解EnableTransactionManagement
中设置了proxyTargetClass = true
,表示可以使用CGLIB,看下面创建代理类的代码
@Override
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
// 判断是否需要优化,是否使用cglib代理,是否指定了接口
// 如果不需要优化,且没有指定要使用cglib代理,而且没有指定接口,就使用JDK代理
if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
Class<?> targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException("xxxxx");
}
// 判断代理类是否是一个接口,或者是否代理类
// 如果是则使用JDK代理
if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
return new JdkDynamicAopProxy(config);
}
return new ObjenesisCglibAopProxy(config);
}
else {
return new JdkDynamicAopProxy(config);
}
}
这个方法有三个判断(第二个忽略):
- 第一个
if
:判断是否需要优化,是否使用cglib代理,是否指定了接口:首先在注册创建类时,就指定了使用cblib代理(config.isProxyTargetClass()=true
),同时这个条件生效时,不进行接口的查找,没有设置过接口对象,即hasNoUserSuppliedProxyInterfaces(config)=false
,但这里是或,所以第一个判断进入 - 第三个
if
:判断targetClass
是否是一个接口类型,注意,这里的代码时bean初始化后执行的,所以这个targetClass不是一个接口类型,所以targetClass.isInterface()=false
,并且我们正要代理事务是吧,所以Proxy.isProxyClass(targetClass)=false
,那么第三个if没有进入,直接return new ObjenesisCglibAopProxy(config);
图解:
之前在AOP章节里,有说到过,在spring的自动代理中,如果代理的对象有接口,那么会使用JDK代理,没有的就使用cglib;
而在这里,因为属性proxyTargetClass=true
的影响,会走到第三个if判断,而且第三个的if都是false,所以都是cglib代理。
总结:
- 在
EnableTransactionManagementConfiguration
配置类中配置了两个注解@EnableTransactionManagement
并且属性都不同,但只要其中有一个的属性是:proxyTargetClass = true
就可以的,为true时,会注册事务代理创建类InfrastructureAdvisorAutoProxyCreator
,并将设置proxyTargetClass = true
设置进去,在创建代理类时跳过接口的查找和设置,最后创建CGLIB代理(我的结论和网上不一样,网上说的是会根据是否实现接口使用不同的代理方式,但我看到的并不是这样) AutoProxyRegister
的作用主要是创建事务自动代理创建类
ProxyTransactionManagementConfiguration做了什么
ProxyTransactionManagementConfiguration
的作用主要是注册了切点pointCut、代理逻辑advice。
这里@Role
忽略掉,不重要。
这里注入了3个bean如下3个bean,以完成代理对象拦截代理逻辑
TransactionInterceptor:事务的advice,继承于TransactionAspectSupport
,事务的核心类,事务的管理,提交,回顾等各种处理细节由它实现。
TransactionAttributeSource:提供事务注解@Transactional
的属性获取,以及是否切点的判断()
BeanFactoryTransactionAttributeSourceAdvisor:继承AbstractPointcutAdvisor
,事务的advisor:pointCut + advice,最后就是ProxyFactory中获取的advisor;
txManager:事务管理器,类:TransactionManager
,由父类注入
这里说明一下TransactionAttributeSource
为何没有实现Pointcut
,但是切点。
其实这里BeanFactoryTransactionAttributeSourceAdvisor
的切点逻辑是代理给了TransactionAttributeSource
处理的。
总结:
- 注册了代理需要的切点(什么类需要代理);
- 代理逻辑(开启事务、执行jdbc、回滚等详细的逻辑),包含事务管理器的注入
事务代理过程
他们都继承自AbstractAdvisorAutoProxyCreator
,并且也都是BeanPostProcessor
的子类,而且InfrastructureAdvisorAutoProxyCreator
并没有重写什么方法,所以创建代理的过程和AOP章节是一样的:
初始化bean后置处理器调用:
在AutoProxyregistrar
中进行了proxyTargetClass
的设置,就是通过proxyFactory.copyFrom(this);
代码复制的:这里的this就是InfrastructureAdvisorAutoProxyCreator
,继承ProxyConfig
,虽然它的类里没有定义属性proxyTargetClass
,但是在AutoProxyregistrar
中手动的为它的beanDefinition添加了这个属性。
所以,不会走下面的代码,直接就进行advisor的获取
if (!proxyFactory.isProxyTargetClass()) {
if (shouldProxyTargetClass(beanClass, beanName)) {
proxyFactory.setProxyTargetClass(true);
}
else {
evaluateProxyInterfaces(beanClass, proxyFactory);
}
}
到这,就已经生成了每个事务代理对象。
如何完成的事务代理
好的,这里我们直接从TransactionInterceptor
看,他是代理方法执行的入口,原因看上面,有解释。
位置:org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
// tas提供事务注解`@Transactional`的属性获取,以及是否切点的判断
TransactionAttributeSource tas = getTransactionAttributeSource();
// tas 绝对不会为null,在自动配置初始化就已经设置进去了
// 获取方法上的@Transactional属性信息
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器,
// 因为自动配置(TransactionAutoConfiguration)注入的是DataSourceTransactionManagerAutoConfiguration,
// 所以这里的tm是DataSourceTransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
// 这段是reactive的代码,不用理会
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
// ......
}
// 这里是转换为父类,面向接口
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 生成事务方法标识
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 事务信息
// 0.生成事务标识
// 1. 先创建数据源对象DataSourceTransactionObject
// 2. 从线程缓存中获取connectionHolder
// 3. 两个分支:
// 3.1 没有存在事务
// 3.1.1. 判断是否需要开启事务:REQUIRED/REQUIRES_NEW/NESTED
// 3.1.2. 创建事务维护对象TransactionStatus
// 3.1.3. 从DataSource中获取一个connection,并设置到connectionHolder中
// 3.1.4. 设置事务属性:状态,事务级别,只读false
// 3.1.5. 禁止自动提交:autoCommit=false
// 3.1.6. 设置事务激活状态,超时时间
// 3.1.7. 将connectionHolder以DataSource为key设置到线程缓存resource中
// 3.1.8. 激活当前事务:设置事务状态、属性等到当前线程变量里
// 4. 将事务线程里的事务信息设置到当前的事务信息里,用于回退,然后绑定当前的事务信息到线程缓存里
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行业务方法逻辑
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 会先判断是否是我们指定的异常,如果是就会滚,如果不是,继续commit
// 这里会恢复被挂起的事务信息
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除事务信息
// 回退到上一个事务信息,如果上一个事务信息不存在,那么就结束事务了
cleanupTransactionInfo(txInfo);
}
// vavr类存在才会走,不用理会
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
这段是一个完整的事务处理,进入createTransactionIfNecessary
方法,看它如何获取的
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果事务属性的名称不存在,则将之前创建的事务标识设置进去
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// 事务维护对象:它保存了事务信息对象、事务的是否提交、是否回滚、和挂起的resource等信息
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 获取事务
// 1. 先创建数据源对象DataSourceTransactionObject
// 2. 从线程缓存中获取connectionHolder
// 3. 两个分支:
// 3.1 没有存在事务
// 3.1.1. 判断是否需要开启事务:REQUIRED/REQUIRES_NEW/NESTED
// 3.1.2. 创建事务维护对象TransactionStatus
// 3.1.3. 从DataSource中获取一个connection,并设置到connectionHolder中
// 3.1.4. 设置事务属性:状态,事务级别,只读false
// 3.1.5. 禁止自动提交:autoCommit=false
// 3.1.6. 设置事务激活状态,超时时间
// 3.1.7. 将connectionHolder以DataSource为key设置到线程缓存resource中
// 3.1.8. 激活当前事务:设置事务状态、属性等到当前线程变量里
// 3.2 存在事务
// 3.2.1. NEVER传播机制:不允许有事务,报错
// 3.2.2. NOT_SUPPORTED传播机制:不支持事务,
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 包装对象
// 4. 将事务线程里的事务信息设置到当前的事务信息里,用于回退,然后绑定当前的事务信息到线程缓存里
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
进入getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 事务定义信息
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取当前线程的事务对象
// 1. 先创建数据源对象DataSourceTransactionObject
// 2. 从线程缓存中获取connectionHolder
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 判断是否有连接处理器和事务是否已开启
if (isExistingTransaction(transaction)) {
// 存在事务,那么安装事务传播机制执行
return handleExistingTransaction(def, transaction, debugEnabled);
}
//3.1 没有存在事务
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// MANDATORY传播机制:如果当前事务不存在,则抛出Exception
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// REQUIRED默认的传播机制
// REQUIRES_NEW新事务传播机制
// NESTED嵌套传播机制
// 除开这3种,其他的都不需要开启事务
// 3.1.1. 判断是否需要开启事务:REQUIRED/REQUIRES_NEW/NESTED
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 创建挂起的事务信息,第一次开启事务,这里suspendedResources=null,就是没有可以被挂起的
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启新事务
// 初始化当前线程里的事务信息
// 3.1.2. 创建事务维护对象TransactionStatus
// 3.1.3. 从DataSource中获取一个connection,并设置到connectionHolder中
// 3.1.4. 设置事务属性:状态,事务级别,只读false
// 3.1.5. 禁止自动提交:autoCommit=false
// 3.1.6. 设置事务激活状态,超时时间
// 3.1.7. 将connectionHolder以DataSource为key设置到线程缓存resource中
// 3.1.8. 激活当前事务:设置事务状态、属性等到当前线程变量里
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
第一次开启事务怎么执行
进入doGetTransaction
protected Object doGetTransaction() {
// 创建数据源事务对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 初始默认设置,默认是false
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 通过dataSource从线程中获取对应的连接处理器,用于管理数据库连接行为
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 将connectingHolder设置到数据源事务对象中,之后判断一个事务是否存在,就是通过这个对象,判断是否有connectionHolder
// 因为不管执行多少次,都是会走这一步
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
TransactionSynchronizationManager是同步机制的中的事务管理对象,它内部有NamedThreadLocal
线程级变量,key为dataSource
,value为ConnectionHolder
。
进入startTransaction
里的doBegin
方法
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 当没有connection处理器,或者事务不同步时就创建一个连接connection处理器
// 这里可以直接理解为,没有连接就获取一个connection,生成connection处理器
// 在上一步中,transaction对象,是从线程缓存中获取的,
// 如果线程缓存中存在connectionHolder,(txObject.hasConnectionHolder() = true)
// 那就表示已经走过这个方法:isSynchronizedWithTransaction()=true
// 如果txObject.hasConnectionHolder()=false,那isSynchronizedWithTransaction()也是false(默认值)
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 从DataSource中获取新连接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 生成connectionHolder,并设置到当前的事务对象里
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 更改状态 isSynchronizedWithTransaction() = true
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 获取最终的connection
con = txObject.getConnectionHolder().getConnection();
// 把事务信息设置到connection
// 并且,如果connection获取到的事务基本和@Transaction注解的基本不一样,就返回connection的事务级别,注意,connection是可能是上一次的,也可能是新创建的
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 将获取到的上一次事务的事务级别设置到当前的事务对象里,这里可以理解成事务的传播
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 默认false
txObject.setReadOnly(definition.isReadOnly());
// 这里就是禁止自动提交,一切交由事务管理器去做
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 设置事务是否可读
prepareTransactionalConnection(con, definition);
// 设置事务激活状态
txObject.getConnectionHolder().setTransactionActive(true);
// 事务的超时时间,默认无限制
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
// 如果是新的connectionHolder那就设置到线程缓存中
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 3.1.2. 创建事务维护对象TransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启事务
// 3.1.3. 从DataSource中获取一个connection,并设置到connectionHolder中
// 3.1.4. 设置事务属性:状态,上一次的事务级别,只读false
// 3.1.5. 禁止自动提交:autoCommit=false
// 3.1.6. 设置事务激活状态,超时时间
// 3.1.7. 将connectionHolder以DataSource为key设置到线程缓存resource中
doBegin(transaction, definition);
// 3.1.8. 激活当前事务:设置事务状态、属性等到当前线程变量里
prepareSynchronization(status, definition);
return status;
}
最后一步:prepareTransactionInfo
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 创建事务信息
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// 这里是将当前线程里缓存的事务信息绑定到当前的事务信息里(oldTransactionInfo)
// 怎么说呢,这里其实就是将上一次的事务信息从线程缓存里拿到,然后设置到当前事务里,在需要回退到上一个状态时用
txInfo.bindToThread();
return txInfo;
}
那么这里接着往下,应该是执行业务逻辑,以及回滚和提交。
我们再回到方法:
org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
看回滚的方法:
我们经常写@Transaction
时,会添加rollbackFor
属性,就是在这里使用的
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 判断是否符合我们指定的异常
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
// 那不是我们指定的异常,则继续提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
然后是清除事务信息cleanupTransactionInfo
,底层是下面这段:
private void restoreThreadLocalStatus() {
// Use stack to restore old transaction TransactionInfo.
// Will be null if none was set.
transactionInfoHolder.set(this.oldTransactionInfo);
}
这里也就对应了txInfo.bindToThread();
这个方法,这里是回退到上一个事务信息,如果有的话
然后这一段:
能走到这里的是CallbackPreferringPlatformTransactionManager
类,看其子类是JTA的,所以这里就没跟了,但应也是一样的逻辑。
已存在事务怎么执行
如果一开始有事务的话,
org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// NEVER传播机制,是不允许有事务的
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// NOT_SUPPORTED传播机制,出现这个时,当前注解下的都没有事务,也没不要开启,所以没有doBegin
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 获取被挂起的事务信息,能走这段代码就说明,已存在事务,所以这里获取的是上一次开启的事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 生成一个新的没有事务信息TransactionStatus回去,同时也把上一次挂起的事务信息设置进去了
// 挂起的事务信息在这次事务提交后或是异常时,是要进行操作的
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// REQUIRES_NEW新事物传播机制
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 同样,挂起事务
// 获取被挂起的事务信息,能走这段代码就说明,已存在事务,所以这里获取的是上一次开启的事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 开启事务,和上面解释过的方法的是同一个方法,同时也把上一次挂起的事务信息设置进去了
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// NESTED传播机制:建立保存点,异常后回退到上一个保存点
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 默认true
if (useSavepointForNestedTransaction()) {
// 这里生成一个新的事务维护对象,newTransaction=false,这里用的还是之前的事务
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 这里是:getConnection().setSavepoint(SAVEPOINT_NAME_PREFIX + this.savepointCounter)
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 是否验证事务
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
// 如果当前的事务级别不存在,或者与指定的级别不同,报错
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// 如果只读,报错
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 生成信息的事务维护对象
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
事务挂起恢复
已经存在事务时,再开启事务,且传播机制是:NOT_SUPPORTED,REQUIRES_NEW
时,会将之前的connectionHolder拿到,然后设置到SuspendedResourcesHolder
挂起对象中,再将当前线程里的事务信息都清空,包括connectionHolder,那么后面就会去拿新的connection
;而这个挂起对象,会设置到TransactionStatus
事务维护对象中,在当前的事务执行完,或者异常时,就会恢复这个挂起对象。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// 激活状态,这个只要执行了startTransaction方法都是true,即事务的已开启
// 会执行的有:REQUIRES_NEW/NESTED传播机制
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 清除掉:transaction里的connectionHolder,线程缓存里的connectionHolder
// 并返回清除的对象,也就是旧的connectionHolder
suspendedResources = doSuspend(transaction);
}
// 清除当前线程的事务信息:事务名称,只读状态,隔离级别等
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 再生成挂起的事务对象,并返回,之后再commit后或者rollback后使用
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
// 然后这里,我不知道它会在什么情况下会走到这里
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// 这里就是第一次开启事务时会返回,也就表示没有可被挂起的对象
return null;
}
}
rollback
- 执行
cleanupTransactionInfo
将事务对象TransactionInfo
恢复到上一次事务
org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
出现异常会先执行cleanupTransactionInfo
,它底层是:
private void restoreThreadLocalStatus() {
// 如果时第一次开启的事务,oldTransactionInfo为null,否则,oldTransactionInfo是上一次的事务
transactionInfoHolder.set(this.oldTransactionInfo);
}
oldTransactionInfo
对象在开启新事物时(REQUIRED/REQUIRES_NEW/NESTED
)绑定到线程,也就是bindToThread()
,这时oldTransactionInfo
取的transactionInfoHolder
,如果这时开启的事务之前已经开过了,那么这里,的transactionInfoHolder
就是上一次的事务对象,就会形成一个类似链表的存在,每个节点都有两个属性:oldTransactionInfo
上一个事务,transactionInfoHolder
当前事务,那么也很容易形成链式反应。
private void bindToThread() {
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 这里默认RuntimeException 和 Error 会被回退
// 判断是否是我们指定的异常,即是否需要回滚
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// 不需要回滚,则继续提交
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
再往里走
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 前置方法,可用于扩展
triggerBeforeCompletion(status);
// 如果要保存点,也就是NESTED传播机制
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 回滚到上一个保存点
status.rollbackToHeldSavepoint();
}
// 新事物
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 回滚:connection.rolaback
doRollback(status);
}
// 不是上面两种情况的走这里
else {
// 这里判断参与事务的某个方法异常,是否需要全局回滚
// 设想一下,什么情况会进来?
// NESTED REQUIRES_NEW这两种都机制都在上面处理了,那么下面处理的就是REQUIRED
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 标记属性rollbackOnly=true
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
// 立即终止事务执行
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 后置方法
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 这个是重要方法:回退到挂载点
cleanupAfterCompletion(status);
}
}
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 这里就是恢复挂载点的逻辑
// 会生成挂载点的只有:NOT_SUPPORTED,REQUIRES_NEW
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
在恢复挂载点后,线程还是同一个,事务信息已经变成旧的信息,而我们的事务执行依靠代理,所以,外层代理时就是上一次的事务信息。
同样的commit里也有一样的方法cleanupAfterCompletion
。