您现在的位置是:首页 >技术交流 >spring-transaction源码分析(5)TransactionInterceptor事务拦截逻辑网站首页技术交流
spring-transaction源码分析(5)TransactionInterceptor事务拦截逻辑
spring-tx的事务拦截逻辑在TransactionInterceptor类,本文将详细分析其实现方式。
事务拦截器TransactionInterceptor
spring-tx的事务拦截逻辑在TransactionInterceptor类,它实现了MethodInterceptor接口。
MethodInterceptor接口
MethodInterceptor接口的实现类封装aop切面拦截逻辑:
public interface MethodInterceptor extends Interceptor {
/**
* Implement this method to perform extra treatments before and after the invocation.
*/
Object invoke(MethodInvocation invocation) throws Throwable;
}
TransactionInterceptor类
TransactionInterceptor类封装了事务拦截逻辑:
public class TransactionInterceptor
extends TransactionAspectSupport implements MethodInterceptor, Serializable {
// ...
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass =
(invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
// ...
}
事务逻辑在父类TransactionAspectSupport的invokeWithinTransaction方法中。
invokeWithinTransaction方法
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 如果方法没有被Transactional注解标注,则返回null
// 返回的是AnnotationTransactionAttributeSource对象
// 用于获取Transactional注解相关属性,
// 比如rollbackOn, propagationBehavior, isolationLevel等
TransactionAttributeSource tas = getTransactionAttributeSource();
TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器
// 后面会转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
TransactionManager tm = determineTransactionManager(txAttr);
// Reactive事务,略
// 转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 获取事务方法的唯一标识
// 格式为"类名.方法名"
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// Handle a throwable, completing the transaction.
// We may commit or roll back, depending on the configuration.
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// Reset the TransactionInfo ThreadLocal.
// Call this in all cases: exception or normal return!
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
// CallbackPreferringPlatformTransactionManager事务管理器逻辑,略
}
}
创建事务
Create a transaction if necessary based on the given TransactionAttribute. Allows callers to perform custom TransactionAttribute lookups through the TransactionAttributeSource.
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 创建新事务或者返回已存在事务, 这取决于传播级别。
// 隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
status = tm.getTransaction(txAttr);
}
}
// 使用指定的事务属性和TransactionStatus创建TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
getTransaction(txAttr)
这个方法创建新事务或者返回已存在事务,这取决于传播级别。隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
public final TransactionStatus getTransaction(TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// DataSourceTransactionManager实现类返回DataSourceTransactionObject对象
// DataSourceTransactionObject对象封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
// 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
// Creating new transaction
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
} else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
doGetTransaction()
protected Object doGetTransaction() {
// DataSourceTransactionObject封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 是否允许设置保存点,NESTED传播级别时使用,DataSourceTransactionManager类型该属性为true
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 从ThreadLocal获取当前线程上绑定的ConnectionHolder
// ConnectionHolder对象保存着数据库连接
// 业务方法第一次执行时为null
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
当前有事务
handleExistingTransaction方法
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
isExistingTransaction方法判断当前是否存在事务:
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 判断存在数据库连接且开启了事务
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
handleExistingTransaction方法:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 传播级别为NEVER
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
// 传播级别设置为NEVER时,如果当前有事务,则抛出异常
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 传播级别为NOT_SUPPORTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 传播级别为REQUIRES_NEW
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
// 挂起当前事务,然后创建新事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 传播级别为NESTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
// Creating nested transaction
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// 传播级别为SUPPORTS/REQUIRED
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel =
TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException(
"Participating transaction with definition [" + definition +
"] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel,
DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
传播级别为NEVER
传播级别设置为NEVER时,如果当前有事务,抛出异常:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
传播级别为NOT_SUPPORTED
挂起当前事务,业务方法以无事务方式执行:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
prepareTransactionStatus方法:
- 创建DefaultTransactionStatus对象,把SuspendedResources封装进去,以便后续恢复旧事务
- 使用TransactionSynchronizationManager将事务属性绑定到当前线程
- 初始化当前线程TransactionSynchronization集
由于传播级别为NOT_SUPPORTED所以此处不会开启事务。
传播级别为REQUIRES_NEW
挂起当前事务,创建新事务:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
// 挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 开启新事务
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
开启新事务:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, SuspendedResourcesHolder suspendedResources) {
// 值为true
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启新事务
doBegin(transaction, definition);
// 初始化当前线程TransactionSynchronization集
prepareSynchronization(status, definition);
return status;
}
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 打开一个新连接
Connection newCon = obtainDataSource().getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
// 设置手动提交
con.setAutoCommit(false);
}
// The default implementation executes a "SET TRANSACTION READ ONLY" statement
// if the "enforceReadOnly" flag is set to true and the transaction definition
// indicates a read-only transaction.
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager
.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
传播级别为NESTED
为当前连接设置保存点,如果业务方法出现异常,会回滚到该保存点位置:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
// Creating nested transaction
// 默认就是true
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 设置保存点
status.createAndHoldSavepoint();
return status;
} else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
设置保存点:
public void createAndHoldSavepoint() throws TransactionException {
setSavepoint(getSavepointManager().createSavepoint());
}
// JdbcTransactionObjectSupport#createSavepoint
public Object createSavepoint() throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
if (!conHolder.supportsSavepoints()) {
throw new NestedTransactionNotSupportedException("不支持");
}
if (conHolder.isRollbackOnly()) {
throw new CannotCreateTransactionException("只读");
}
// 使用jdbc设置保存点
return conHolder.createSavepoint();
} catch (SQLException ex) {
throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);
}
}
传播级别为SUPPORTS/REQUIRED/MANDATORY
// 默认false
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel =
TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException(
"Participating transaction with definition [" + definition +
"] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel,
DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
事务挂起
把当前线程上绑定的资源、事务配置信息移除封装到SuspendedResourcesHolder对象,传递给新创建的TransactionStatus对象,以便在业务方法执行结束后恢复旧事务:
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
// 判断当前线程的Set<TransactionSynchronization>已经存在
// TransactionSynchronization: 事务回调同步器,定义了事务挂起、恢复等方法
// 例如mybatis-spring中有SqlSessionSynchronization实现类
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 挂起事务
suspendedResources = doSuspend(transaction);
}
// 清除当前线程事务配置参数:事务名、只读属性、隔离级别等
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel =
TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 把当前线程的事务相关信息封装起来以便后续恢复
return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations,
name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
// Neither transaction nor synchronization active.
return null;
}
}
private List<TransactionSynchronization> doSuspendSynchronization() {
List<TransactionSynchronization> suspendedSynchronizations =
TransactionSynchronizationManager.getSynchronizations();
// 挂起所有的TransactionSynchronization
// 比如SqlSessionSynchronization实现类会清除当前线程的SessionFactory
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
synchronization.suspend();
}
// 清除线程上的TransactionSynchronization集
TransactionSynchronizationManager.clearSynchronization();
return suspendedSynchronizations;
}
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
// ConnectionHolder对象
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
事务恢复
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
// 恢复之前挂起的是ConnectionHolder对象
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations =
resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager
.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
protected void doResume(Object transaction, Object suspendedResources) {
// 恢复之前挂起的是ConnectionHolder对象
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
当前无事务
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
// 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
// Creating new transaction
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
} else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
创建TransactionInfo
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification,
TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
} else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
}
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}
异常处理
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
回滚:
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
// 回滚到指定保存点
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
// 事务回滚
doRollback(status);
} else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
// 设置rollback-only
doSetRollbackOnly(status);
}
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
// 这里面有恢复挂起事务的逻辑
cleanupAfterCompletion(status);
}
}
// 回滚到指定保存点
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
conHolder.getConnection().rollback((Savepoint) savepoint);
conHolder.resetRollbackOnly();
} catch (Throwable ex) {
throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
}
}
// 释放保存点
public void releaseSavepoint(Object savepoint) throws TransactionException {
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
try {
conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
} catch (Throwable ex) {
logger.debug("Could not explicitly release JDBC savepoint", ex);
}
}
// 事务回滚
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.rollback();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
// 恢复连接的事务属性,比如自动提交方式、隔离级别、只读属性等
// 将连接归还给数据源,清除ConnectionHolder的conn
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
// 恢复之前挂起的事务
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
事务清理
private void restoreThreadLocalStatus() {
// Use stack to restore old transaction TransactionInfo.
// Will be null if none was set.
transactionInfoHolder.set(this.oldTransactionInfo);
}
事务提交
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
提交:
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
// Transactional code has requested rollback
// 回滚
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
// Global transaction is marked as rollback-only but transactional code requested commit
// 回滚
processRollback(defStatus, true);
return;
}
// 提交事务
processCommit(defStatus);
}
// 提交事务
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
unexpectedRollback = status.isGlobalRollbackOnly();
// 释放保存点
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交事务
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
cleanupAfterCompletion(status);
}
}