您的位置 首页 java

Spring5源码15-事务的创建、回滚、提交

1. 前言

1.1 transaction SynchronizationManager

TransactionSynchronizationManager 中使用 ThreadLocal 保存了在不同线程中不同事务的信息。

 public  abstract  class TransactionSynchronizationManager {

 private   static  final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);

private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");

private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");

private static final ThreadLocal< Boolean > currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");

private static final ThreadLocal<Integer> currentTransaction isolation Level =
new NamedThreadLocal<>("Current transaction isolation level");

private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
...
}
复制代码  

我们从上面的部分代码可以看到,TransactionSynchronizationManager 中保存的是各个线程中的事务信息。

1.2 事务属性

1.2.1 只读

对一个查询操作来说,如果我们把它设置成只读,就能够明确告诉数据库,这个操作不涉及写操作。这 样数据库就能够针对查询操作来进行优化。

 @Transactional(readOnly = true)
复制代码  

注意:对增删改操作设置只读会抛出下面异常: Caused by: java.sql.SQL Exception : Connection is read-only. Queries leading to data modification are not allowed

1.2.2 超时

事务在执行过程中,有可能因为遇到某些问题,导致程序卡住,从而长时间占用数据库资源。而长时间 占用资源,大概率是因为程序运行出现了问题(可能是Java程序或 MySQL数据库 或网络连接等等)。 此时这个很可能出问题的程序应该被回滚,撤销它已做的操作,事务结束,把资源让出来,让其他正常 程序可以执行。概括来说就是一句话:超时回滚,释放资源。

 @Transactional(timeout = 3)
复制代码  

执行过程中抛出异常: org.springframework.transaction.TransactionTimedOutException: Transaction timed out: deadline was Fri Jun 04 16:25:39 CST 2022

1.2.3 回滚策略

声明式事务默认只针对运行时异常回滚,编译时异常不回滚。可以通过@Transactional中相关属性设置回滚策略:

  • rollbackFor属性:需要设置一个Class类型的对象
  • rollbackForClassName属性:需要设置一个 字符串 类型的全类名
  • noRollbackFor属性:需要设置一个Class类型的对象
  • noRollbackForClassName属性:需要设置一个字符串类型的全类名
 @Transactional(noRollbackFor = ArithmeticException.class)
@Transactional(noRollbackForClassName = "java.lang.ArithmeticException")
复制代码  

1.2.4 事务隔离级别

数据库系统必须具有隔离并发运行各个事务的能力,使它们不会相互影响,避免各种并发问题。一个事 务与其他事务隔离的程度称为隔离级别。SQL标准中规定了多种事务隔离级别,不同隔离级别对应不同 的干扰程度,隔离级别越高,数据一致性就越好,但并发性越弱。 隔离级别 一共有四种:

  • 读未提交:READ UNCOMMITTED : 允许 Transaction 01读取Transaction02未提交的修改。
  • 读已提交:READ COMMITTED: 要求Transaction01只能读取Transaction02已提交的修改。
  • 可重复读:REPEATABLE READ:确保Transaction01可以多次从一个字段中读取到相同的值,即Transaction01执行期间禁止其它事务对这个字段进行更新。
  • 串行化: SERIALIZABLE : 确保Transaction01可以多次从一个表中读取到相同的行,在Transaction01执行期间,禁止其它事务对这个表进行添加、更新、删除操作。可以避免任何并发问题,但性能十分低下。

各个隔离级别解决并发问题的能力见下表:

各种数据库产品对事务隔离级别的支持程度:

使用方式如下:

 @Transactional(isolation = Isolation.DEFAULT)//使用数据库默认的隔离级别
@Transactional(isolation = Isolation.READ_UNCOMMITTED)//读未提交
@Transactional(isolation = Isolation.READ_COMMITTED)//读已提交
@Transactional(isolation = Isolation.REPEATABLE_READ)//可重复读
@Transactional(isolation = Isolation.SERIALIZABLE)//串行化
复制代码  

1.2.5 事务传播行为

当事务方法被另一个事务方法调用时,必须指定事务应该如何传播。例如:方法可能继续在现有事务中运行,也可能开启一个新事务,并在自己的事务中运行。

事务传播属性

解释

PROPAGATION_REQUIRED

支持当前事务,如果当前没有事务,就新建一个事务。这是最常见的选择。即如果上级具有事务,则使用上级的事务,不具备则自己新建一个事务

PROPAGATION_REQUIRES_NEW

新建事务,如果当前存在事务,把当前事务挂起。即如果上级存在事务,则挂起上级事务,使用自己新创建的事务

PROPAGATION_MANDATORY

支持当前事务,如果当前没有事务,就抛出异常。即如果上级具有事务,则使用上级的事务,上级没有事务,则抛出异常

PROPAGATION_SUPPORTS

支持当前事务,如果当前没有事务,就以非事务方式执行。即如果上级具有事务,则使用上级的事务,如果上级没有事务,则不开启事务

PROPAGATION_NOT_SUPPORTED

以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。即如果上级具有事务,则使用挂起上级事务,使用非事务方式。

PROPAGATION_NEVER

以非事务方式执行,如果当前存在事务,则抛出异常

PROPAGATION_NESTED

如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则进行与PROPAGATION_REQUIRED类似的操作。

这里解释一下 PROPAGATION_NESTED 和 PROPAGATION_REQUIRES_NEW 的区别:

  • PROPAGATION_REQUIRES_NEW 启动一个新的, 不依赖于环境的 “内部” 事务. 这个事务将被完全 commited 或 rolledback 而不依赖于外部事务,它拥有自己的隔离范围, 自己的锁, 等等. 当内部事务开始执行时, 外部事务将被挂起, 内部事务结束时, 外部事务将继续执行。PROPAGATION_REQUIRES_NEW常用于日志记录,或者交易失败仍需要留痕
  • PROPAGATION_NESTED 开始一个 “嵌套的” 事务, 它是已经存在事务的一个真正的子事务. 潜套事务开始执行时, 它将取得一个 savepoint .如果这个嵌套事务失败, 我们将回滚到此 savepoint. 潜套事务是外部事务的一部分,只有外部事务结束后它才会被提交.

由此可见, PROPAGATION_REQUIRES_NEW 和PROPAGATION_NESTED 的最大区别在于:

  • PROPAGATION_REQUIRES_NEW 完全是一个新的事务, 而 PROPAGATION_NESTED 则是外部事务的子事务, 如果外部事务 commit, 潜套事务也会被 commit, 这个规则同样适用于 rollback.

2. 事务的创建 – createTransactionIfNecessary

上篇中,我们分析到 TransactionAspectSupport#invokeWithinTransaction 方法完成了事务的增强调用。而其中createTransactionIfNecessary 方法则是在需要的时候创建了事务,之所以说需要的时候而不是说直接创建,是因为这里要考虑到事务的传播属性。

createTransactionIfNecessary 的实现是在TransactionAspectSupport#createTransactionIfNecessary 中,完成了事务的创建,这里面考虑了事务的传播属性的处理,所以并不是一定会创建事务,根据传播属性的不同会有不同的处理。

详细代码如下:

 // TransactionAttribute 是解析出来的事务
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        // 如果没有名称指定则使用方法唯一标识,并使用  DelegatingTransactionAttribute 封装 txAttr
        if (txAttr != null && txAttr.getName() == null) {
                txAttr = new DelegatingTransactionAttribute(txAttr) {
                        @Override
                        public String getName() {
                                return joinpointIdentification;
                        }
                };
        }

        TransactionStatus status = null;
        if (txAttr != null) {
                if (tm != null) {
                        // 获取事务
                        status = tm.getTransaction(txAttr);
                }
                else {
                        if (logger.isDebugEnabled()) {
                                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                                                "] because no transaction manager has been configured");
                        }
                }
        }
        // 构建事务信息
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
复制代码  

这里我们可以看到其基本逻辑如下:

  1. 使用 DelegatingTransactionAttribute 封装传入的 TransactionAttribute 实例。对于传入的 TransactionAttribute 类型的参数 txAttr ,当前实际类型是 RuleBasedTransactionAttribute,是由获取事务属性时生成的,主要用于数据承载,而这里之所以使用DelegatingTransactionAttribute 进行封装,也是为了提供更多的功能。
  2. 获取事务。即 tm.getTransaction(txAttr);,事务处理的核心当然是事务,这里获取到了事务。实际上getTransaction 方法返回的是 TransactionStatus (实现类是 DefaultTransactionStatus)。DefaultTransactionStatus 是对事务的进一步封装,包含了当前事务信息、挂起事务信息(如果有),保存点等信息。
  3. 构建事务信息。即prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); 。对上面几个步骤获取的信息构建 TransactionInfo 并返回。TransactionInfo 是 DefaultTransactionStatus 更进一步的封装。

我们来详细看看几个类的具体内容:

  • 关于事务管理器 TransactionManager ,不管是 Jpa 还是 jdbc 等都实现自接口 PlatformTransactionManager 如果你添加的是 spring-boot-starter-jdbc 依赖,框架会默认注入 DataSource TransactionManager 实例。如果你添加的是 spring-boot-starter-data-jpa 依赖,框架会默认注入 JpaTransactionManager 实例。
  • TransactionStatus (实际上的实现是 DefaultTransactionStatus) 里面包含的内容:
    • 这里注意suspendedResources 实际上保存了是挂起的上层事务的信息。如果没有上层事务(也就是没嵌套事务),就是null,这里是通过UserProxyServiceImpl#findAll(事务传播属性是REQUIRED) 调用 UserServiceImpl#finaAll(事务传播属性是 REQUIRES_NEW) 的方式挂起来了一个来自 com.kingfish.springjdbcdemo.service.UserProxyServiceImpl.findAll 方法的事务信息。 savepoint 只有在内嵌事务的隔离级别是 PROPAGATION_NESTED 才有可能会保存。
  • TransactionInfo 里面包含的内容:
    • 可以看到 TransactionInfo 是 TransactionStatus、TransactionAttribute、TransactionManager 等属性更进一步封装。
  • 关于事务挂起封装成的SuspendedResourcesHolder。

了解完上述一些类的保存内容后,下面我们来详细分析 createTransactionIfNecessary 中的几个方法

2.1 获取事务 – tm.getTransaction(txAttr)

实际上调用的是 AbstractPlatformTransactionManager#getTransaction 方法,在这里面获取了事务(可能是创建新事物,也可能不是),返回的类型是 TransactionStatus。

下面我们来看看其代码:

 @Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                throws TransactionException {

        // Use defaults if no transaction definition given.
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
        // 1. 获取事务
        Object transaction = doGetTransaction();
        boolean  debug Enabled = logger.isDebugEnabled();
        // 2. 判断当前 线程 是否存在事务,判断依据是当前线程记录的数据库连接不为空,且连接(connectionHolder)中的 transactionActive 属性 为true;
        // 这个方法的实现在 DataSourceTransactionManager#isExistingTransaction。
        if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                // 3.当前线程已经存在事务,则按照嵌套事务的逻辑处理
                return handleExistingTransaction(def, transaction, debugEnabled);
        }
        // 到这里就表明当前线程没有事务存在了,即不会出现嵌套事务的情况了
        // Check definition settings for new transaction.
        // 事务超时验证
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        // 下面是针对事务传播属性进行处理了
        // 4. 如果传播属性是 PROPAGATION_MANDATORY 。但是当前线程又不存在事务,则抛出异常
        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                                "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        // 5. 如果传播属性是PROPAGATION_REQUIRED 、PROPAGATION_REQUIRES_NEW 、PROPAGATION_NESTED 都需要新建事务
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                // 5.1. 进行空挂起。为了记录原有事务的状态,便于后续操作对事务的恢复。因为这里原先并不存在事务,所以进行空挂起
                SuspendedResourcesHolder suspendedResources = suspend(null);
                if (debugEnabled) {
                        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
                }
                try {
                        // 5.2.开启事务,并返回了事务状态
                        return startTransaction(def, transaction, debugEnabled, suspendedResources);
                }
                catch (RuntimeException | Error ex) {
                        resume(null, suspendedResources);
                        throw ex;
                }
        }
        else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                                        "isolation level will effectively be ignored: " + def);
                }
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                // 6. 构建事务信息
                return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
}
复制代码  

可以看到基本逻辑如下:

  1. 获取事务 : 这里使用的是 DataSourceTransactionManager#doGetTransaction。创建基于JDBC 的事务实例。如果当前线程存在关于 dataSource 的连接,那么直接使用。这里有对保存点的一个设置,是否开启允许保存点取决于是否设置了允许嵌入式事务。
  2. 如果当前线程存在事务,则转向嵌套事务的处理
  3. 进行事务超时设置的验证
  4. 如果传播属性是 PROPAGATION_MANDATORY 。但是当前线程又不存在事务,则抛出异常
  5. 如果传播属性是PROPAGATION_REQUIRED 、PROPAGATION_REQUIRES_NEW 、PROPAGATION_NESTED 都需要新建事务,通过 startTransaction 开始事务构建
  6. 不满足上述传播属性,则使用 prepareTransactionStatus 来进行事务构建

对于一些隔离级别、timeout等功能的设置并不是Spring完成的,而是委托给底层的数据库连接去做的。基于上面的逻辑我们再展开看一些方法:

2.1.1 doGetTransaction();

doGetTransaction(); 实际调用 DataSourceTransactionManager#doGetTransaction。目的就是获取事务对象。其实现逻辑很简单 :如果当前线程存在关于 dataSource 的连接,那么直接使用。这里有对保存点的一个设置,是否开启允许保存点取决于是否设置了允许嵌入式事务。

 protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        // 是否允许设置保存点 : 决定是否允许嵌套事务的存在
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        // 如果当前线程已经记录了数据库连接则使用原有连接
        ConnectionHolder conHolder =
                        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
        // false 代表非新创建连接
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
}
复制代码  

2.1.2 isExistingTransaction(transaction)

isExistingTransaction(transaction) 的实现是DataSourceTransactionManager#isExistingTransaction 中。其作用是判断当前线程是否存在事务,判断依据是当前线程记录的数据库连接不为空,且连接(connectionHolder)中的 transactionActive 属性为true 。

 protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 存在连接,且其 transactionActive 属性不为空,isTransactionActive() 中的返回是  transactionActive = true 
        return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
复制代码  

2.1.3 handleExistingTransaction(def, transaction, debugEnabled);

handleExistingTransaction(def, transaction, debugEnabled); 的实现在 AbstractPlatformTransactionManager#handleExistingTransaction 中,这里是为了处理嵌套事务,也就是说进入这一步则说明当前线程已经存在了事务。

这里面主要还是根据传播属性的不同而进行不同的逻辑处理。代码如下:

 private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {
        // 如果传播属性是 PROPAGATION_NEVER,当时当前线程有事务,则抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                throw new IllegalTransactionStateException(
                                "Existing transaction found for transaction marked with propagation 'never'");
        }
        // 如果传播属性是 PROPAGATION_NOT_SUPPORTED,则需要挂起当前事务。以不使用事务的形式调用
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {

                // 挂起当前事务
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                // 准备事务信息
                return prepareTransactionStatus(
                                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        // 如果传播属性是 PROPAGATION_REQUIRES_NEW,则需要挂起当前事务。新创建事务使用
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {

                // 挂起当前事务,关于 suspend 方法,我们下面会讲
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                        // 重新创建事务
                        return startTransaction(definition, transaction, debugEnabled, suspendedResources);
                }
                catch (RuntimeException | Error beginEx) {
                        resumeAfterBeginException(transaction, suspendedResources, beginEx);
                        throw beginEx;
                }
        }
                // 如果传播属性是 PROPAGATION_NESTED,则如果当前存在事务,则在嵌套事务内执行。否则自己创建事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                if (!isNestedTransactionAllowed()) {
                        throw new NestedTransactionNotSupportedException(
                                        "Transaction manager does not allow nested transactions by default - " +
                                        "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                // 如果可以使用保存点的方式控制事务回滚,则在嵌入式事务的建立时便建立保存点
                if (useSavepointForNestedTransaction()) {
                        // Create savepoint within existing Spring-managed transaction,
                        // through the SavepointManager API implemented by TransactionStatus.
                        // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                        // 嵌入式事务的建立时便建立保存点
                        DefaultTransactionStatus status =
                                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                        // 创建设置保存点
                        status.createAndHoldSavepoint();
                        return status;
                }
                else {
                        // Nested transaction through nested begin and commit/rollback calls.
                        // Usually only for JTA: Spring synchronization might get activated here
                        // in case of a pre-existing JTA transaction.
                        // 有些情况下是不能使用保存点操作,如 JTA,这时候就需要新建事务
                        return startTransaction(definition, transaction, debugEnabled, null);
                }
        }

        // 进行事务的合法性校验
        if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                        Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                        if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                                Constants isoConstants = DefaultTransactionDefinition.constants;
                                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                                (currentIsolationLevel != null ?
                                                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                                                "(unknown)"));
                        }
                }
                if (!definition.isReadOnly()) {
                        if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                definition + "] is not marked as read-only but existing transaction is");
                        }
                }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
复制代码  

需要注意的是:

  • 对于 PROPAGATION_REQUIRES_NEW 传播属性,其表示当前方法必须要在他自己的事务中运行,一个新的事务被启动,所以原先的事务会被先挂起(suspend),挂起后作为当前事务 TransactionStatus 的一个属性(suspendedResources)存在。当前事务执行结束后,再将原事务还原。
  • 对于 PROPAGATION_NESTED 传播属性,由于其需要的并非是全新的事务,而且当前事务的子事务。Spring考虑了两种情况
    • Spring中允许嵌套事务的时候,则首选设置保存点的方式作为异常处理的回滚。
    • 对于其他方式,比如 JTA 无法使用保存点的方式,那么处理方式和 PROPAGATION_REQUIRES_NEW相同,一旦出现异常,则由spring的事务异常处理机制去完成后续操作。

2.1.4 suspend(null)

suspend(null); 调用的是 AbstractPlatformTransactionManager#suspend 内容。其主要作用是为了记录原有事务的状态,便于后续操作对事务的恢复,实际上是将原事务的信息封装成 SuspendedResourcesHolder ,作为 TransactionStatus的一个属性存在。(我们这里的空挂起直接会返回null, 其他情况则会返回一个正常的SuspendedResourcesHolder)。代码基本逻辑如下:

  • 记录原事务信息
  • 清除原事务信息
  • 将原事务信息封装成 SuspendedResourcesHolder 返回,以便于外界恢复事务时使用

其代码如下:

 protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
        // 如果当前事务处于激活状态
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
                List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
                try {
                        Object suspendedResources = null;
                        if (transaction != null) {
                                // 这里主要将当前事务的数据源(ConnectionHolder)解绑
                                suspendedResources = doSuspend(transaction);
                        }
                        // 获取当前事务name、readOnly、isolationLevel 、wasActive  等属性,封装成 SuspendedResourcesHolder 返回
                        // 同时将当前事务的各种信息重置
                        String name = TransactionSynchronizationManager.getCurrentTransactionName();
                        TransactionSynchronizationManager.setCurrentTransactionName(null);
                        boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                        TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                        Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                        boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                        TransactionSynchronizationManager.setActualTransactionActive(false);
                        return new SuspendedResourcesHolder(
                                        suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
                }
                catch (RuntimeException | Error ex) {
                        // doSuspend failed - original transaction is still active...
                        doResumeSynchronization(suspendedSynchronizations);
                        throw ex;
                }
        }
        // 如果当前事务并未激活且存在transaction 
        else if (transaction != null) {
                // Transaction active but no synchronization active.
                Object suspendedResources = doSuspend(transaction);
                return new SuspendedResourcesHolder(suspendedResources);
        }
        else {
                // Neither transaction nor synchronization active.
                return null;
        }
}
复制代码  

2.1.5 startTransaction(def, transaction, debugEnabled, suspendedResources);

startTransaction(def, transaction, debugEnabled, suspendedResources); 的实现在AbstractPlatformTransactionManager#startTransaction 中,主要作用是创建新的事务。经历了上面几步处理,到达这一步的时候说明,当前已经不是嵌套事务。

 private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        // 创建一个默认的DefaultTransactionStatus 
        DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        // 构造 Transaction,包括设置 ConnectionHolder、隔离级别、timeout。
        // 并且如果是新连接,则绑定当当前线程。
        doBegin(transaction, definition);
        // 新同步事务的设置,针对于当期线程的设置
        prepareSynchronization(status, definition);
        return status;
}
复制代码  

我们详细看下面两个方法:

doBegin(transaction, definition);

doBegin(transaction, definition); 的实现在 DataSourceTransactionManager#doBegin 中。 这里的目的是 为了构造 transaction,包括设置 ConnectionHolder、隔离级别、timeout。如果是新连接,则绑定到当前线程。

 protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
                // 如果当前线程中的数据库连接不存在,或者事务同步为true的情况下需要重新获取数据库连接,进行同步
                if (!txObject.hasConnectionHolder() ||
                                txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                        Connection newCon = obtainDataSource().getConnection();
                        if (logger.isDebugEnabled()) {
                                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                        }
                        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
                }
                // 将同步标识设置为 true
                txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
                con = txObject.getConnectionHolder().getConnection();
                // 设置事务隔离级别
                Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                txObject.setPreviousIsolationLevel(previousIsolationLevel);
                txObject.setReadOnly(definition.isReadOnly());

                // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
                // so we don't want to do it unnecessarily (for example if we've explicitly
                // configured the connection pool to set it already).
                // 更改自动提交,将数据库的自动提交改为 Spring 来控制,否则数据库执行结束sql后自动提交了
                if (con.getAutoCommit()) {
                        txObject.setMustRestoreAutoCommit(true);
                        if (logger.isDebugEnabled()) {
                                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                        }
                        con.setAutoCommit(false);
                }
                // 准备事务连接,这里实际上执行了 SET TRANSACTION READ ONLY 的sql 语句
                prepareTransactionalConnection(con, definition);
                // 设置当前线程已经存在事务,这个 transactionActive 属性是判断是否当前线程存在事务的依据
                txObject.getConnectionHolder().setTransactionActive(true);
                // 设置超时时间
                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                        txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
                }

                // Bind the connection holder to the thread.
                // 如果是新的连接,则绑定数据库连接到当前线程
                if (txObject.isNewConnectionHolder()) {
                        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
                }
        }

        catch (Throwable ex) {
                if (txObject.isNewConnectionHolder()) {
                        DataSourceUtils.releaseConnection(con, obtainDataSource());
                        txObject.setConnectionHolder(null, false);
                }
                throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
}

...

protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
                throws SQLException {

        if (isEnforceReadOnly() && definition.isReadOnly()) {
                try (Statement stmt = con.createStatement()) {
                        // 设置事务为只读
                        stmt.executeUpdate("SET TRANSACTION READ ONLY");
                }
        }
}
复制代码  

可以说事务是从这个函数开始的,因为在这个函数中已经开始尝试对数据库连接的获取,并在在获取数据库连接的同时,也进行了一些必要的设置的同步。

  • 尝试获取连接。但并不是每一次都会获取新的连接,如果当前线程中的 ConnectionHolder 已经存在,则没必要再次获取,或者对事物同步表示设置为true的需要重新获取连接。
  • 设置隔离级别和只读标识
  • 更改事务的默认提交设置。
  • 如果事务属性是自动提交,那么需要改变这个设置,将操作委托给Spring来处理。
  • 设置标志位,标识当前连接已经被事务激活。
  • 设置过期时间
  • 将ConnectionHolder 绑定到当前线程

我们这里额外看一下 DataSourceUtils.prepareConnectionForTransaction 方法是如何设置隔离级别的

 public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)
                throws SQLException {

        Assert.notNull(con, "No Connection specified");

        boolean debugEnabled = logger.isDebugEnabled();
        // Set read-only flag.
        // 设置属性只读
        if (definition != null && definition.isReadOnly()) {
                try {
                        if (debugEnabled) {
                                logger.debug("Setting JDBC Connection [" + con + "] read-only");
                        }
                        con.setReadOnly(true);
                }
                catch (SQLException | RuntimeException ex) {
                        Throwable exToCheck = ex;
                        while (exToCheck != null) {
                                if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
                                        // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
                                        throw ex;
                                }
                                exToCheck = exToCheck.getCause();
                        }
                        // "read-only not supported" SQLException -> ignore, it's just a hint anyway
                        logger.debug("Could not set JDBC Connection read-only", ex);
                }
        }

        // Apply specific isolation level, if any.
        // 设置数据库隔离级别
        Integer previousIsolationLevel = null;
        if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                if (debugEnabled) {
                        logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
                                        definition.getIsolationLevel());
                }
                int currentIsolation = con.getTransactionIsolation();
                if (currentIsolation != definition.getIsolationLevel()) {
                        previousIsolationLevel = currentIsolation;
                        con.setTransactionIsolation(definition.getIsolationLevel());
                }
        }

        return previousIsolationLevel;
}
复制代码  

可以看到,在 DataSourceUtils.prepareConnectionForTransaction 方法中并没有什么复杂的逻辑,因为其主要实现都交由更底层的 数据库API 来完成了。

prepareSynchronization(status, definition);

prepareSynchronization(status, definition); 的实现在AbstractPlatformTransactionManager#prepareSynchronization 中。其目的是将事务信息记录到当前线程中,逻辑很简单,这里不再赘述。

 protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        // 如果是新的事务,则需要同步信息
        if (status.isNewSynchronization()) {
                // 下面是对事务的信息的记录,记录到当前线程中。
                TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                                definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                                                definition.getIsolationLevel() : null);
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
                TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
                TransactionSynchronizationManager.initSynchronization();
        }
}
复制代码  

2.2 构建事务信息 – prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);

prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); 的实现是在TransactionAspectSupport#prepareTransactionInfo 中。

当已经建立事务连接并完成了事务信息的提取后,我们需要将所有的事务信息统一记录在 TransactionInfo 类型的实例中,这个实例包含了目标方法开始前的所有状态信息,一旦事务执行失败,Spring会通过 TransactionInfo 类实例中的信息来进行回滚等后续工作。

详细代码如下:

 protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, String joinpointIdentification,
                @Nullable TransactionStatus status) {
        // 封装成 TransactionInfo  实例
        TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
        if (txAttr != null) {

                // 记录事务状态
                txInfo.newTransactionStatus(status);
        }
        else {

        }
        // 绑定到线程当前上
        txInfo.bindToThread();
        return txInfo;
}
复制代码  

2.3 总结

TransactionAspectSupport#createTransactionIfNecessary 的功能是根据需要创建事务。这里面考虑到嵌套事务的情况,并对事务的传播属性进行了相应的处理,最终处理后。返回的是一个 TransactionInfo 的值,里面封装了事务的各种信息,供给后面的回滚或者提交使用。

3. 事务的回滚 – completeTransactionAfterThrowing

completeTransactionAfterThrowing 的实现是 TransactionAspectSupport#completeTransactionAfterThrowing。在出现异常的时候,通过该方法进行事务的回滚。

下面我们来看具体代码实现:

 protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
        // 判断当前线程中是否存在事务
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
                if (logger.isTraceEnabled()) {
                        logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                                        "] after exception: " + ex);
                }
                // 判断是否触发回滚操作,这里的条件是异常是否是 RuntimeException 或 error类型
                // 即 return (ex instanceof RuntimeException || ex instanceof Error);
                if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                        try {
                                // 执行回滚操作
                                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                        }
                        catch (TransactionSystemException ex2) {
                                logger.error("Application exception overridden by rollback exception", ex);
                                ex2.initApplicationException(ex);
                                throw ex2;
                        }
                        catch (RuntimeException | Error ex2) {
                                logger.error("Application exception overridden by rollback exception", ex);
                                throw ex2;
                        }
                }
                else {
                        // We don't roll back on this exception.
                        // Will still roll back if TransactionStatus.isRollbackOnly() is true.
                        try {
                                // 如果不满足回滚条件,则还是会提交,也就是说如果抛出异常不是 RuntimeException 或 error类型,则不会触发事务的回滚。
                                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                        }
                        catch (TransactionSystemException ex2) {
                                logger.error("Application exception overridden by commit exception", ex);
                                ex2.initApplicationException(ex);
                                throw ex2;
                        }
                        catch (RuntimeException | Error ex2) {
                                logger.error("Application exception overridden by commit exception", ex);
                                throw ex2;
                        }
                }
        }
}
复制代码  

上面可以看到,触发回滚操作的判断条件是

 txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)
复制代码  

txInfo.transactionAttribute != null 自不必说,表示必须存在事务属性信息,即是事务方法。txInfo.transactionAttribute.rollbackOn(ex)默认的实现如下:

 public boolean rollbackOn(Throwable ex) {
        return (ex instanceof RuntimeException || ex instanceof Error);
}
复制代码  

也就是说只有当异常类型是 Error或者 RuntimeException 才会进行回滚。 这里需要注意,如果抛出了其他类型异常,那么并不代表事务没有启用,而是回滚没有触发。

txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); 回滚的具体操作即即 中的 AbstractPlatformTransactionManager#rollback 方法

 @Override
public final void rollback(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
                throw new IllegalTransactionStateException(
                                "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        // 执行回滚
        processRollback(defStatus, false);
}

....

// 省略了日志打印
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
                boolean unexpectedRollback = unexpected;

                try {
                        // 激活所有 TransactionSynchronization 中对应的方法 beforeCompletion() 方法
                        triggerBeforeCompletion(status);
                        // 如果当前事务有保存点,也就是当前事务为单独的线程则会退到保存点
                        if (status.hasSavepoint()) {
                                status.rollbackToHeldSavepoint();
                        }
                        else if (status.isNewTransaction()) {
                                // 如果当前事务为独立的新事物,则直接回退
                                doRollback(status);
                        }
                        else {
                                // Participating in larger transaction
                                if (status.hasTransaction()) {
                                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                                                // 如果当前事务不是独立的事务,那么只能标记状态,等到事务链执行完毕后统一回滚
                                                doSetRollbackOnly(status);
                                        }
                                        else {

                                        }
                                }
                                else {
                                }
                                // Unexpected rollback only matters here if we're asked to fail early
                                if (!isFailEarlyOnGlobalRollbackOnly()) {
                                        unexpectedRollback = false;
                                }
                        }
                }
                catch (RuntimeException | Error ex) {
                        triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                        throw ex;
                }
                // 激活所有 TransactionSynchronization 中对应的方法 afterCompletion() 方法
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

                // Raise UnexpectedRollbackException if we had a global rollback-only marker
                if (unexpectedRollback) {
                        throw new UnexpectedRollbackException(
                                        "Transaction rolled back because it has been marked as rollback-only");
                }
        }
        finally {
                // 清空记录的资源并将挂起的资源恢复
                cleanupAfterCompletion(status);
        }
}
复制代码  

我们可以简单总结一下整个脉络:

  1. 首先是自定义触发器的调用,这里是Spring提供的回滚扩展点,包括在回滚前、回滚成功、回滚失败的触发器调用,自定义触发器会根据这些信息作出进一步的处理。而对于触发器的注册,常见的是在回调过程中通过 TransactionSynchronizationManager#registerSynchronization 方法完成,
  2. 除了触发监听函数外,就是真正的回滚逻辑处理了,而这里的具体回滚操作都委托给了底层数据库连接提供的API 来操作的。回滚的基本策略是: 当之前已经保存的事务信息中有保存点信息的时候,使用保存点信息回滚。常用于嵌入式事务,对于嵌入式事务的处理,内嵌的事务异常并不会引起外部事物的回滚。 对于之前没有保存的事务信息中的事务为新事物,那么直接回滚。常用语单独事务的处理,对于没有保存点的回滚,Spring同样是使用底层数据库连接提供的API 来操作的。我们这里则是 DataSourceTransactionManager#doRollback 当前事务信息标明是存在事务的,又不属于以上两种情况,多数用于 JTA,只做回滚标识,等到提交的时候统一不提交。
  3. 回滚后的信息清除。对于回滚逻辑执行结束后,无论回滚是否成功,都必须要将信息清除。

3.1 自定义触发器的调用

自定义触发器的调用,包括在回滚前、回滚成功、回滚失败的触发器调用。实现逻辑都类似,我们这里挑一个回滚前的触发器看一看逻辑:

 protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
        if (status.isNewSynchronization()) {
                if (status.isDebug()) {
                        logger.trace("Triggering beforeCompletion synchronization");
                }
                // 触发前置操作
                TransactionSynchronizationUtils.triggerBeforeCompletion();
        }
}
复制代码  

TransactionSynchronizationUtils.triggerBeforeCompletion(); 的代码如下:

 public static void triggerBeforeCompletion() {
        // 获取所有TransactionSynchronization 触发 beforeCompletion 方法
        for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
                try {
                        synchronization.beforeCompletion();
                }
                catch (Throwable tsex) {
                        logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);
                }
        }
}
复制代码  

triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); 和 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); 与之逻辑类似,这里就不再看了。

3.2 回滚逻辑处理

回滚操作最底层的操作都是委托给了数据库API 来实现,所以这里并没有负责的逻辑。

回滚逻辑上面也说了有三种情况,下面我们来一个一个看一看:

3.2.1. status.rollbackToHeldSavepoint();

status.rollbackToHeldSavepoint();的实现在 AbstractTransactionStatus#rollbackToHeldSavepoint 中。

处理情况是 :当之前已经保存的事务信息中有保存点信息的时候,使用保存点信息回滚。常用于嵌入式事务,对于嵌入式事务(并非是嵌套service事务)的处理,内嵌的事务异常并不会引起外部事物的回滚。

 public void rollbackToHeldSavepoint() throws TransactionException {
        Object savepoint = getSavepoint();
        if (savepoint == null) {
                throw new TransactionUsageException(
                                "Cannot roll back to savepoint - no savepoint associated with current transaction");
        }
        // 回滚到 savepoint 
        getSavepointManager().rollbackToSavepoint(savepoint);
        // 释放保存点
        getSavepointManager().releaseSavepoint(savepoint);
        setSavepoint(null);
}
复制代码  

这里使用的是 JDBC 的方式进行数据库了连接,所以这里调用的是 getSavepoint(); 返回的是 JdbcTransactionObjectSupport类型。所以这里我们来看 JdbcTransactionObjectSupport 中的

 @Override
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
                // 直接调用 Connection的rollback 方法
                conHolder.getConnection().rollback((Savepoint) savepoint);
                conHolder.resetRollbackOnly();
        }
        catch (Throwable ex) {
                throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
        }
}
@Override
public void releaseSavepoint(Object savepoint) throws TransactionException {
        ConnectionHolder conHolder = getConnectionHolderForSavepoint();
        try {
                // 直接调用 Connection的releaseSavepoint方法
                conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
        }
        catch (Throwable ex) {
                logger.debug("Could not explicitly release JDBC savepoint", ex);
        }
}
复制代码  

3.2.2. doRollback(status);

doRollback(status); 的实现在DataSourceTransactionManager#doRollback 中。

处理情况是 :对于之前没有保存的事务信息中的事务为新事物,那么直接回滚。常用语单独事务的处理,对于没有保存点的回滚,Spring同样是使用底层数据库连接提供的API 来操作的。

 @Override
protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
                logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
                // 直接调用rollback方法 回滚
                con.rollback();
        }
        catch (SQLException ex) {
                throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
}
复制代码  

3.2.3. doSetRollbackOnly(status);

doSetRollbackOnly(status); 的实现在DataSourceTransactionManager#doSetRollbackOnly 中。

处理情况: 当前事务信息标明是存在事务的,又不属于以上两种情况,多数用于 JTA(Java Transaction API),只做回滚标识,等到提交的时候统一不提交。即当外围事务进行提交时,发现内嵌的事务中 rollbackOnly 被设置成true。则直接进行回滚,而不再尝试提交

 @Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        if (status.isDebug()) {
                logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
                                "] rollback-only");
        }
        txObject.setRollbackOnly();
}
复制代码  

3.3 回滚信息清除

cleanupAfterCompletion(status); 的实现在 AbstractPlatformTransactionManager#cleanupAfterCompletion 中。 其作用是完成事务回滚的收尾工作,主要包括以下内容 :

  • 设置状态是对事务信息做完成标识以避免重复调用
  • 如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除
  • 如果是新事物需要做出清除资源的工作
 private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
                // 清除当前线程的中关于该事务的信息
                TransactionSynchronizationManager.clear();
        }
        if (status.isNewTransaction()) {
                // 清理事务信息
                doCleanupAfterCompletion(status.getTransaction());
        }
        if (status.getSuspendedResources() != null) {
                if (status.isDebug()) {
                        logger.debug("Resuming suspended transaction after completion of inner transaction");
                }
                Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
                // 结束之前事务的挂起状态。
                resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
        }
}
复制代码  

我们来看看下面的两个方法:

3.3.1. doCleanupAfterCompletion(status.getTransaction());

doCleanupAfterCompletion(status.getTransaction()); 的实现在DataSourceTransactionManager#doCleanupAfterCompletion 中。在这里完成事务回滚的收尾工作。

 @Override
protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

        // Remove the connection holder from the thread, if exposed.
        if (txObject.isNewConnectionHolder()) {
                // 将数据库连接从当前线程中解除绑定
                TransactionSynchronizationManager.unbindResource(obtainDataSource());
        }
        // Reset connection.
        // 释放连接
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
                if (txObject.isMustRestoreAutoCommit()) {
                        // 恢复数据库连接自动提交属性
                        con.setAutoCommit(true);
                }
                // 重置数据连接
                DataSourceUtils.resetConnectionAfterTransaction(
                                con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
        }
        catch (Throwable ex) {
                logger.debug("Could not reset JDBC Connection after transaction", ex);
        }

        if (txObject.isNewConnectionHolder()) {
                if (logger.isDebugEnabled()) {
                        logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
                }
                // 如果当前事务是独立的新创建的事务则在事务完成时释放数据库连接。
                DataSourceUtils.releaseConnection(con, this.dataSource);
        }

        txObject.getConnectionHolder().clear();
}
复制代码  

3.3.2. resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());

如果在事务执行前有事务挂起,那么当前事务执行结束后需要将挂起的事务恢复。

 protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
                throws TransactionException {

        if (resourcesHolder != null) {
                // 获取挂起的事务
                Object suspendedResources = resourcesHolder.suspendedResources;
                if (suspendedResources != null) {
                        // 恢复事务
                        doResume(transaction, suspendedResources);
                }
                List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
                // 设置事务属性
                if (suspendedSynchronizations != null) {
                        TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
                        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
                        TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
                        TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
                        doResumeSynchronization(suspendedSynchronizations);
                }
        }
}
复制代码  

从上面整个回滚流程可以看到,很多的操作,Spring都交给数据库底层实现了,Spring仅仅起到了一个管理的作用。

3.4 总结

事务的回滚操作,根据不同的情况执行不同的回滚策略。并且在回滚结束之后恢复了挂起的事务(如果有挂起事务)。

4. 事务的提交 – commitTransactionAfterReturning

commitTransactionAfterReturning 的实现在 TransactionAspectSupport#commitTransactionAfterReturning 中。完成了事务的提交任务。

上面我们分析了Spring的事务异常处理机制。如果事务的执行并没有出现任何异常,那么也就意味着事务可以走正常的提交流程了。

 protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {
                if (logger.isTraceEnabled()) {
                        logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
                }
                // 进行事务提交
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
}
复制代码  

在真正的数据提交前,还需要做一个判断,在上面分析事务异常处理规则的时候,当某个事务既没有保存点也不是新事物的时候,Spring对他的处理方式只是设置一个回滚标识(在事务回滚的第三种回滚策略中完成 )。这个回滚标识在这里就会派上用场了。应用场景如下:

  • 某个事务是另一个事物的嵌套事务,但是这些事务又不在Spring的管理范围内,或者无法设置保存点(比如分布式事务场景,A 调用B,B异常,则会标记回滚标识,在B提交时会进行回滚),那么Spring会通过设置回滚标识的方式来禁止提交。首先当某个嵌入事务发生回滚的时候会设置回滚标识,而等到外部事物提交时,一旦判断当前事务流被设置了回滚标识,则由外部事物统一来进行整体的事务回滚。

所以当事务没有被异常捕获的时候也并不意味着一定会执行提交流程。

我们来看看 commit 方法。commit方法的实现在 AbstractPlatformTransactionManager#commit 中

 @Override
public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
                throw new IllegalTransactionStateException(
                                "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        // 如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚。
        if (defStatus.isLocalRollbackOnly()) {
                if (defStatus.isDebug()) {
                        logger.debug("Transactional code has requested rollback");
                }
                processRollback(defStatus, false);
                return;
        }

        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
                if (defStatus.isDebug()) {
                        logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
                }
                processRollback(defStatus, true);
                return;
        }
        // 处理事务提交
        processCommit(defStatus);
}
复制代码  

4.1 processRollback(defStatus, false);

这个方法是回滚方法,请移步观看回滚文章

4.2 processCommit(defStatus);

processCommit(defStatus); 的实现在AbstractPlatformTransactionManager#processCommit 中。在这个方法中完成了事务的提交操作。在提交过程中也并不是直接提交,而是考虑了诸多方面。符合提交条件的如下:

  • 当事务状态中有保存点信息的话便不会去提交事务。
  • 当事务非新事务的时候也不会去执行提交事务操作。 此条件主要考虑内嵌事务的情况们对于内嵌事务,在Spring中正常的处理方式是将内嵌式无开始之前设置保存点,一旦内嵌事务出现啊异常便根据保存点信息进行回滚,但是如果没有出现异常,内嵌事务并不会单独提交,而是根据事务流最外层事务负责提交,所以如果当前存在保存点信息便不是最外层事务,不做保存操作,对于是否是新事物的判断也是基于此考虑。
 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
                boolean beforeCompletionInvoked = false;

                try {
                        boolean unexpectedRollback = false;
                        // 预留操作
                        prepareForCommit(status);
                        // 调用自定义触发器的方法
                        triggerBeforeCommit(status);
                        triggerBeforeCompletion(status);
                        beforeCompletionInvoked = true;
                        // 如果设置了保存点信息
                        if (status.hasSavepoint()) {
                                if (status.isDebug()) {
                                        logger.debug("Releasing transaction savepoint");
                                }
                                // 清除保存点信息
                                unexpectedRollback = status.isGlobalRollbackOnly();
                                status.releaseHeldSavepoint();
                        }
                        // 如果是新事物
                        else if (status.isNewTransaction()) {
                                if (status.isDebug()) {
                                        logger.debug("Initiating transaction commit");
                                }
                                unexpectedRollback = status.isGlobalRollbackOnly();
                                // 如果是独立事务则直接提交
                                doCommit(status);
                        }
                        // 不是新事物并不会直接提交,而是等最外围事务进行提交。
                        else if (isFailEarlyOnGlobalRollbackOnly()) {
                                unexpectedRollback = status.isGlobalRollbackOnly();
                        }

                        // Throw UnexpectedRollbackException if we have a global rollback-only
                        // marker but still didn't get a corresponding exception from commit.
                        if (unexpectedRollback) {
                                throw new UnexpectedRollbackException(
                                                "Transaction silently rolled back because it has been marked as rollback-only");
                        }
                }
                catch (UnexpectedRollbackException ex) {
                        // can only be caused by doCommit
                        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                        throw ex;
                }
                catch (TransactionException ex) {
                        // can only be caused by doCommit
                        if (isRollbackOnCommitFailure()) {
                                doRollbackOnCommitException(status, ex);
                        }
                        else {
                                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                        }
                        throw ex;
                }
                catch (RuntimeException | Error ex) {
                        if (!beforeCompletionInvoked) {
                                triggerBeforeCompletion(status);
                        }
                        // 提交过程中会出现异常则回滚
                        doRollbackOnCommitException(status, ex);
                        throw ex;
                }

                // Trigger afterCommit callbacks, with an exception thrown there
                // propagated to callers but the transaction still considered as committed.
                try {
                        triggerAfterCommit(status);
                }
                finally {
                        triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
                }

        }
        finally {
                // 清理事务信息
                cleanupAfterCompletion(status);
        }
}
复制代码  

DataSourceTransactionManager#doCommit 中的实现也就是至极调用数据库连接的 commit 方法。

 protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
                logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
                // 调用commit 方法
                con.commit();
        }
        catch (SQLException ex) {
                throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
}
复制代码  

4.3 总结

事务的提交并不是直接提交,也是有几个方面的考虑:

  • 如果事务被设置了回滚标识,则不会提交,直接回滚
  • 如果事务中有保存点信息则不会提交事务操作。
  • 如果事务非新事务的时候也不会去执行提交事务操作。

文章来源:智云一二三科技

文章标题:Spring5源码15-事务的创建、回滚、提交

文章地址:https://www.zhihuclub.com/196040.shtml

关于作者: 智云科技

热门文章

网站地图