Spring事务源码探究

Spring事务管理是Spring框架提供的一种机制,用于管理数据库事务。它提供了声明式事务管理和编程式事务管理两种方式。

1. 基础


1.1. 声明式事务管理

在声明式事务管理中,我们可以使用注解或XML配置来管理事务。常用的注解有@Transactional,可以标记在类或方法上,表示该类或方法需要事务管理。

1
2
3
4
5
6
@Transactional
public class UserService {
    public void updateUser(User user) {
        // 更新用户信息
    }
}

在XML配置中,我们可以使用<tx:advice><tx:attributes>来配置事务属性。

1
2
3
4
5
<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <tx:method name="update*" propagation="REQUIRED"/>
    </tx:attributes>
</tx:advice>

1.2. 编程式事务管理

编程式事务管理是通过编写代码来管理事务。Spring提供了TransactionTemplate来简化编程式事务管理的操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class UserService {
    private TransactionTemplate transactionTemplate;
    
    public void updateUser(User user) {
        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
            @Override
            protected void doInTransactionWithoutResult(TransactionStatus status) {
                // 更新用户信息
            }
        });
    }
}

1.3. 事务传播行为

事务传播行为定义了在不同事务方法调用之间事务是如何传播的。常见的传播行为包括REQUIREDREQUIRES_NEWNESTED等。

1
2
3
4
@Transactional(propagation = Propagation.REQUIRED)
public void updateUserInfo(User user) {
    // 更新用户信息
}

1.4. 事务隔离级别

事务隔离级别定义了事务之间的隔离程度,包括READ_UNCOMMITTEDREAD_COMMITTEDREPEATABLE_READSERIALIZABLE等级别。

1
2
3
4
@Transactional(isolation = Isolation.READ_COMMITTED)
public void updateUserInfo(User user) {
    // 更新用户信息
}

Spring事务管理提供了灵活和强大的功能,可以帮助我们管理数据库事务,确保数据的一致性和完整性。

1.5. 源码中的定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
public interface TransactionDefinition {
    // 传播机制
	int PROPAGATION_REQUIRED = 0;
	int PROPAGATION_SUPPORTS = 1;
	int PROPAGATION_MANDATORY = 2;
	int PROPAGATION_REQUIRES_NEW = 3;
	int PROPAGATION_NOT_SUPPORTED = 4;
	int PROPAGATION_NEVER = 5;
	int PROPAGATION_NESTED = 6;
    
    // 隔离级别
	int ISOLATION_DEFAULT = -1;
	int ISOLATION_READ_UNCOMMITTED = 1;  // same as java.sql.Connection.TRANSACTION_READ_UNCOMMITTED;
	int ISOLATION_READ_COMMITTED = 2;  // same as java.sql.Connection.TRANSACTION_READ_COMMITTED;
	int ISOLATION_REPEATABLE_READ = 4;  // same as java.sql.Connection.TRANSACTION_REPEATABLE_READ;
	int ISOLATION_SERIALIZABLE = 8;  // same as java.sql.Connection.TRANSACTION_SERIALIZABLE;

	int TIMEOUT_DEFAULT = -1;
}

2. 使用方法

2.1. 主要注解

  • @EnableTransactionManagement
  • @Transactional
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@ComponentScan("org.springframework.gang")
@EnableAspectJAutoProxy
@EnableTransactionManagement
public class AppConfig {
    
    // ...
    
    
	@Bean
	public JdbcTemplate jdbcTemplate() {
		JdbcTemplate jdbctemplate = new JdbcTemplate(dataSource());
		return jdbctemplate;
	}

	@Bean
	public PlatformTransactionManager transactionManager() {
		DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
		transactionManager.setDataSource(dataSource());
		return transactionManager;
	}

	@Bean
	public DataSource dataSource() {
		DriverManagerDataSource dataSource = new DriverManagerDataSource();
		dataSource.setDriverClassName("com.mysql.jdbc.Driver");
		dataSource.setUrl("jdbc:mysql://localhost:3306/test");
		dataSource.setUsername("root");
		dataSource.setPassword("123456");
		return dataSource;
	}

}

2.2. EnableTrnasactionManagement

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
    // ...
}

导入了一个TransactionManagementConfigurationSelector这个类,

TransactionManagementConfigurationSelector.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
	@Override
	protected String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
                // 默认PROXY
				return new String[] {AutoProxyRegistrar.class.getName(),
						ProxyTransactionManagementConfiguration.class.getName()};
			case ASPECTJ:
                // 使用AspectJ
				return new String[] {determineTransactionAspectClass()};
			default:
				return null;
		}
	}

	private String determineTransactionAspectClass() {
		return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
				TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
				TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
	}

}

想Spring容器中注入了两个类:

  • AutoProxyRegistrar.java
  • ProxyTransactionManagementConfiguration.java

2.2.1. AutoProxyRegistrar

实际上是像容器中注册了InfrastructureAdvisorAutoProxyCreator.java

1
2
3
4
5
	@Nullable
	public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, @Nullable Object source) {
		// 注册 InfrastructureAdvisorAutoProxyCreator
		return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
	}

开启AOP,会生成Spring容器中的Advisor对象,但是和AnnotationAwareAspectJAutoProxyCreator不一样,他不会去解析AspectJ的那些注解,虽然都是AbstractAdvisorAutoProxyCreator的子类,但是功能不同。


2.2.2. ProxyTransactionManagementConfiguration

这个类像容器中又注册了三个Bean:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

	@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {

		BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
        
        
		advisor.setTransactionAttributeSource(transactionAttributeSource);
        
        // 代理逻辑
		advisor.setAdvice(transactionInterceptor);
		if (this.enableTx != null) {
			advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
		}
		return advisor;
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionAttributeSource transactionAttributeSource() {
		return new AnnotationTransactionAttributeSource();
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
		TransactionInterceptor interceptor = new TransactionInterceptor();
		interceptor.setTransactionAttributeSource(transactionAttributeSource);
		if (this.txManager != null) {
			interceptor.setTransactionManager(this.txManager);
		}
		return interceptor;
	}

}
  1. BeanFactoryTransactionAttributeSourceAdvisor:实际上就是一个Advisor。从该方法的参数中可以看到需要一个TransactionInterceptor,这个Bean在第三个方法返回的。
  2. TransactionAttributeSourceAnnotationTransactionAttributeSource定义了一个PointCut,这个类会解析@Transactional注解中的元素信息determineTransactionAttribute(AnnotatedElement element)
  3. TransactionInterceptor:事务的代理逻辑。

2.2.3. BeanFactoryTransactionAttributeSourceAdvisor

这个类是事务的Advisor,那么就要有PointCut和Advice两部分。首先来看PointCut到底在哪里,在这个类中有一个pointcut属性,内容如下:

1
2
3
4
5
6
7
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
    @Override
    @Nullable
    protected TransactionAttributeSource getTransactionAttributeSource() {
        return transactionAttributeSource;
    }
};

PointCut的筛选,要么是类筛选(ClassFilter),要么是方法筛选(MethodFilter),所以再进一步看一下TransactionAttributeSourcePointcut,构造方法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
protected TransactionAttributeSourcePointcut() {
    setClassFilter(new TransactionAttributeSourceClassFilter());
}


private class TransactionAttributeSourceClassFilter implements ClassFilter {

    @Override
    public boolean matches(Class<?> clazz) {
        // 事务内部的几个类,要先排除掉。
        if (TransactionalProxy.class.isAssignableFrom(clazz) ||
            TransactionManager.class.isAssignableFrom(clazz) ||
            PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
            return false;
        }
        TransactionAttributeSource tas = getTransactionAttributeSource();
        return (tas == null || tas.isCandidateClass(clazz));
    }
}

TransactionAttributeSourceClassFilter是一个内部类,实现了类过滤的matches方法。

先过滤掉和事务相关的几个注解。

在是用tas的方法筛选出有@Transactional注解的类。调用链如下:

  1. org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#isCandidateClass

  2. parser.isCandidateClass(targetClass)

  3. org.springframework.transaction.annotation.SpringTransactionAnnotationParser#isCandidateClass

    1
    2
    3
    4
    
    @Override
    public boolean isCandidateClass(Class<?> targetClass) {
        return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
    }

    从上面的代码中可以看出,校验了类上是否有Transactional注解

TransactionAttributeSourcePointcut中还有方法的筛选逻辑:

1
2
3
4
5
@Override
public boolean matches(Method method, Class<?> targetClass) {
    TransactionAttributeSource tas = getTransactionAttributeSource();
    return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

tas.getTransactionAttribute(method, targetClass)这个方法会解析并缓存匹配到的TransactionAttribute

进入该方法之后,到达AbstractFallbackTransactionAttributeSource#computeTransactionAttribute中,在这个方法中会去解析@Transactional注解,经过parse之后生成TransactionAttribute,具体代码如下:

SpringTransactionAnnotationParser#parseTransactionAnnotation(AnnotationAttributes)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
    RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

    Propagation propagation = attributes.getEnum("propagation");
    rbta.setPropagationBehavior(propagation.value());
    Isolation isolation = attributes.getEnum("isolation");
    rbta.setIsolationLevel(isolation.value());

    rbta.setTimeout(attributes.getNumber("timeout").intValue());
    String timeoutString = attributes.getString("timeoutString");
    Assert.isTrue(!StringUtils.hasText(timeoutString) || rbta.getTimeout() < 0,
                  "Specify 'timeout' or 'timeoutString', not both");
    rbta.setTimeoutString(timeoutString);

    rbta.setReadOnly(attributes.getBoolean("readOnly"));
    rbta.setQualifier(attributes.getString("value"));
    rbta.setLabels(Arrays.asList(attributes.getStringArray("label")));

    List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
    for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
        rollbackRules.add(new RollbackRuleAttribute(rbRule));
    }
    for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
        rollbackRules.add(new RollbackRuleAttribute(rbRule));
    }
    for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
        rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    }
    for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
        rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    }
    rbta.setRollbackRules(rollbackRules);

    return rbta;
}

3. Spring事务处理逻辑

3.1. TransactionInterceptor

1
2
3
4
@Transactional
public void insert() {
    jdbcTemplate.execute("insert into user(name,sex,age) values('test','男',18)");
}

主要的代理逻辑入口是TransactionInterceptor这个类的invoke()方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Object invoke(MethodInvocation invocation) throws Throwable {
    
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
        @Override
        @Nullable
        public Object proceedWithInvocation() throws Throwable {

            // 执行后续的Interceptor, 以及被代理的方法:如userService的insert方法
            return invocation.proceed();
        }
        @Override
        public Object getTarget() {
            return invocation.getThis();
        }
        @Override
        public Object[] getArguments() {
            return invocation.getArguments();
        }
    });
}

在进入invokeWithinTransaction()方法,Spring事务主要的逻辑就是在这个方法中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                                         final InvocationCallback invocation) throws Throwable {

    // TransactionAttribute就是@Transactional中的配置
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取@Transactional注解中的属性值
    final TransactionAttribute txAttr = (
        tas != null ?
        tas.getTransactionAttribute(method, targetClass) :
        null
    );

    // 返回Spring容器中类型为TransactionManager的Bean对象
    final TransactionManager tm = determineTransactionManager(txAttr);

    // ReactiveTransactionManager用得少,并且它只是执行方式是响应式的,原理流程和普通的是一样的
    // ,..

    // 把tm强制转换成PlatformTransactionManager, 所以我们在定义时得定义PlatformTransactionManager类型
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);

    // joinpoint的唯一标识, 就是当前在执行方法的名字
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    // CallbackPreferringPlatformTransactionManager表示有回调功能的PlatformTransactionManager
    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.

        // ☆ ->
        // 如果有必要就创建事务,这里就涉及到事务传播机制的实现了
        // TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务
        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 执行下一个Interceptor或被代理对象中的方法
            retVal = invocation.proceedWithInvocation(); // test()
        } catch (Throwable ex) {
            // target invocation exception
            // 抛异常了, 回滚事务
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        } finally {
            cleanupTransactionInfo(txInfo);
        }

        if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
            // Set rollback-only in case of Vavr failure matching our rollback rules...
            TransactionStatus status = txInfo.getTransactionStatus();
            if (status != null && txAttr != null) {
                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
        }

        // 提交事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    } else {
        // ...
    }
}

进入32行的createTransactionIfNecessary()这个方法,这个方法是判断是否需要开启新的事务以及开启事务。

3.2. 如何开启新事务的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected TransactionInfo createTransactionIfNecessary(
    @Nullable PlatformTransactionManager tm,
    @Nullable TransactionAttribute txAttr,
    final String joinpointIdentification
) {

    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    // 每个逻辑事务都会创建一个TransactionStatus,
    // 但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {

            // ☆ ->
            // 开启事务
            status = tm.getTransaction(txAttr);
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
            }
        }
    }

    //
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

进入25行的tm.getTransaction(txAttr);方法中

AbstractPlatformTransactionManager.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    // 获取一个新的DataSourceTransactionObject对象
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // 判断是否存在一个事务
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        // 已经存在一个事务了
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // 不存在事务
    // Check definition settings for new transaction.
    // 判断@Transactional(timeout=?)
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        // 如果配置的事手动开启事务 则直接抛出异常
        throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
    } else if (
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED
    ) {
        // 当前的Thread中在没有事务的前提下, 上面三种传播机制是等价的

        // 没有事务需要挂起, 不过TransactionSynchronization有可能需要挂起
        // suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization)
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }

        try {
            // ☆ ->
            // 开启事务后, transaction中就会有数据库连接了, 并且是isTransactionActive为true的
            // 并返回TransactionStatus对象, 该对象保存了很多信息, 包括被挂起的资源
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        } catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    } else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

走上来会先判断是否已经存在一个事务了,如果已经存在了,则走handleExistingTransaction(def, transaction, debugEnabled);的逻辑,并直接return回去。否则继续往下执行。

如何判断当前已经存在事务了?

猜测是通过ThreadLocal,上面代码的第8行Object transaction = doGetTransaction();

org.springframework.jdbc.datasource.DataSourceTransactionManager

1
2
3
4
5
6
7
8
@Override
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

第5行getResource,最终会走到

org.springframework.transaction.support.TransactionSynchronizationManager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");

@Nullable
private static Object doGetResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
    if (map == null) {
        return null;
    }
    Object value = map.get(actualKey);
    // Transparently remove ResourceHolder that was marked as void...
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        map.remove(actualKey);
        // Remove entire ThreadLocal if empty...
        if (map.isEmpty()) {
            resources.remove();
        }
        value = null;
    }
    return value;
}

上面的resources实际上就是要一个ThreadLocal。其中Map的key为DataSource,value为ConnectionHolder对象。

先看下已存在事务的情况:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
private TransactionStatus handleExistingTransaction(
			TransactionDefinition definition,
			Object transaction,
			boolean debugEnabled
	) throws TransactionException {

		// 判断当前事务的传播机制

		// 如果是NEVER 就直接报错
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
			// 以非事务方式运行,如果当前存在事务,则抛出异常
			throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
			// 以非事务方式运行,如果当前存在事务,则把当前事务挂起。

			if (debugEnabled) {
				logger.debug("Suspending current transaction");
			}

			// 挂起当前事务, 不新建事务
			Object suspendedResources = suspend(transaction);
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(
					definition,
					null,
					false,
					newSynchronization,
					debugEnabled,
					suspendedResources
			);
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
			// 不管当前有没有事务, 都会新建一个事务

			if (debugEnabled) {
				logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
			}

			// 先挂起当前事务
			SuspendedResourcesHolder suspendedResources = suspend(transaction);
			try {

				// 开启一个新事务
				return startTransaction(definition, transaction, debugEnabled, suspendedResources);
			} catch (RuntimeException | Error beginEx) {
				resumeAfterBeginException(transaction, suspendedResources, beginEx);
				throw beginEx;
			}
		}

		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			// 如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于 PROPAGATION_REQUIRED
			if (!isNestedTransactionAllowed()) {
				throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
			}
			if (debugEnabled) {
				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
			}
			if (useSavepointForNestedTransaction()) {
				DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);

				// 创建一个savepoint
				status.createAndHoldSavepoint();
				return status;
			} else {
				// Nested transaction through nested begin and commit/rollback calls.
				// Usually only for JTA: Spring synchronization might get activated here
				// in case of a pre-existing JTA transaction.
				return startTransaction(definition, transaction, debugEnabled, null);
			}
		}

		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
		if (debugEnabled) {
			logger.debug("Participating in existing transaction");
		}

		if (isValidateExistingTransaction()) {
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
					Constants isoConstants = DefaultTransactionDefinition.constants;
					throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));
				}
			}
			if (!definition.isReadOnly()) {
				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
					throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
				}
			}
		}

		// 如果依然是Propagation.REQUIRED
		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
	}
}

3.3. suspend

回到主线AbstractPlatformTransactionManager#getTransaction()之中,如果当前线程中不存在事务,则先挂起,但是即然都没有事务了,还需要挂起什么呢,所以suspend(null)方法中的参数是null。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
		// synchronizations 是一个ThreadLocal<Set<TransactionSynchronization>>
		// 我们可以在任何地方通过TransactionSynchronizationManager给当前线程添加TransactionSynchronization.

		if (TransactionSynchronizationManager.isSynchronizationActive()) {
			// 调用TransactionSynchronization的suspend方法, 并清空和返回当前线程中所有的
			List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
			try {
				Object suspendedResources = null;
				if (transaction != null) {

					// 挂起事务, 把transaction中的Connection清空, 并把resources中的key-value进行移除,
					// 并返回数据连接Connection对象
					suspendedResources = doSuspend(transaction);
				}

				// 获取并清空当前线程中关于TransactionSynchronizationManager的设置,
				// 准备开启新的事务(因为你都要挂起了, 肯定是为了新开启别的事务)
				String name = TransactionSynchronizationManager.getCurrentTransactionName();
				TransactionSynchronizationManager.setCurrentTransactionName(null);
				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
				TransactionSynchronizationManager.setActualTransactionActive(false);

				// 将当前线程中的数据库连接对象、TransactionSynchronization对象、TransactionSynchronizationManager中的设置构造成一个对象
				// 表示被挂起的资源持有对象,持有了当前线程中的事务对象、TransactionSynchronization对象
				return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);

			} catch (RuntimeException | Error ex) {
				// doSuspend failed - original transaction is still active...
				doResumeSynchronization(suspendedSynchronizations);
				throw ex;
			}
		} else if (transaction != null) {
			// Transaction active but no synchronization active.
			Object suspendedResources = doSuspend(transaction);
			return new SuspendedResourcesHolder(suspendedResources);
		} else {
			// Neither transaction nor synchronization active.
			return null;
		}
	}

如果没有事务的话TransactionSynchronizationManager.isSynchronizationActive()这个条件也是false。这个方法对应的是TransactionSynchronizationManager#initSynchronization()。这个方法在org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction方法的最后被间接调用到,在prepareSynchronization()可以看到。

那么挂起到底执行了什么?进入doSuspend()方法中:

DataSourceTransactionManager#doSuspend

1
2
3
4
5
6
@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
  • txObject中的connetionHolder清空
  • 删除resource的ThreadLocal的DataSource
  • 返回连接对象。

除了doSuspend()方法中需要做的,还有suspend()中的剩下的逻辑,拿到当前TransactionSynchronizationManagement中的当前事务的名字啊,是否是readOnly啊,之类的其他属性(这些属性在事务同步管理器中都是以ThreadLocal的形式存储的),最后设置到SuspendedResourcesHolder中,并返回这个SuspendedResourcesHolder

从上面的过程来看,挂起实际上就是一个保存现场的过程

Spring这里还为我们提供了一个扩展点,当我们执行userService.insert()方法的时候,如果有这个需求:当前事务被挂起、恢复、事务提交前,事务提交后的时候需要执行一个操作,我们就可以向TransactionSynchronizationManager中注册一个TransactionSynchronization。代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Transactional
public void insert() {
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
        @Override
        public void suspend() {
            TransactionSynchronization.super.suspend();
        }

        @Override
        public void resume() {
            TransactionSynchronization.super.resume();
        }

        @Override
        public void beforeCommit(boolean readOnly) {
            TransactionSynchronization.super.beforeCommit(readOnly);
        }

        @Override
        public void afterCommit() {
            TransactionSynchronization.super.afterCommit();
        }
    });


    jdbcTemplate.execute("insert into user(name,sex,age) values('test','男',18)");
}

而我们注册的这个同步器什么时候被执行呢,执行的逻辑(挂起)就在doSuspendSynchronization()中。

1
2
3
4
5
6
7
8
private List<TransactionSynchronization> doSuspendSynchronization() {
    List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
    for (TransactionSynchronization synchronization : suspendedSynchronizations) {
        synchronization.suspend();
    }
    TransactionSynchronizationManager.clearSynchronization();
    return suspendedSynchronizations;
}

从代码中可以看出,拿到了所有的同步器,并且挨个执行suspend()扩展方法。


3.4. startTransaction

startTransaction(def, transaction, debugEnabled, suspendedResources);开启新事务的逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
private TransactionStatus startTransaction(
    TransactionDefinition definition,
    Object transaction,
    boolean debugEnabled,
    @Nullable SuspendedResourcesHolder suspendedResources
) {
    // 是否开启一个新的TransactionSynchronization
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

    // 开启的这个事务的状态信息
    // 事务的定义、用来保存数据库连接的对象、是否是新事务, 是否是新的TransactionSynchronization
    DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

    // ☆ -> DataSourceTransactionManager
    // 开启事务
    doBegin(transaction, definition);

    // 如果需要新开一个TransactionSynchronization, 就把新创建的事务的一些状态信息
    // 设置到TransactionSynchronizationManager中
    prepareSynchronization(status, definition);
    return status;
}

3.5. doBegin

doBegin(transaction, definition);开启事务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        // 如果当前线程中所使用的DataSource还没有创建过数据库连接, 就获取一个数据库连接
        if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            Connection newCon = obtainDataSource().getConnection();
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        // 根据@Transactional注解中的设置, 设置Connection的readOnly与隔离级别
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        txObject.setReadOnly(definition.isReadOnly());

        // 设置autoCommit为false
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);

            // 设置
            con.setAutoCommit(false);
        }

        prepareTransactionalConnection(con, definition);
        txObject.getConnectionHolder().setTransactionActive(true);

        // 设置数据库连接超时时间
        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // 把新建的数据库连接设置到resources中,resources就是一个ThreadLocal<Map<Object, Object>>,
        // 事务管理器中的设置的DataSource对象为key,数据库连接对象为value
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    } catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}
  • 创建数据库连接
  • 设置设置相关属性
  • 将数据库连接设置到ThreadLocal中

当挂起完毕操作之后接下来就是doBegin()和前面提到的prepareSynchronization()方法,将新的事务设置到TransactionSynchronizationManager中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
            definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
            definition.getIsolationLevel() :
            null
        );
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
        TransactionSynchronizationManager.initSynchronization();
    }
}

上面的逻辑处理完了之后,要么有异常回滚事务,要么正常结束提交事务。

对应到TransactionAspectSupport#invokeWithinTransaction()方法中的就是下面两个方法:

  1. commitTransactionAfterReturning(txInfo);:提交事务
  2. completeTransactionAfterThrowing(txInfo, ex);:回滚事务

3.6. commit

AbstractPlatformTransactionManager#commit

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;

    // 可以通过TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); 来设置
    // 判断是否需要强制回滚
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus, false);
        return;
    }

    // 判断此事务在之前是否设置了需要回滚, 跟globalRollbackOnParticipationFailure有关
    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        processRollback(defStatus, true);
        return;
    }

    processCommit(defStatus);
}
  • defStatus.isLocalRollbackOnly():这个方法是在判断是否是手动强制回滚了,在代码中我们可以通过下面的代码来让事务强制回滚,即使没有异常:

    1
    
    TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
  • defStatus.isGlobalRollbackOnly()

继续往下走,进入processCommit()方法中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;

            // 前面说到的register进去的几个回调方法
            prepareForCommit(status);
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;

            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                status.releaseHeldSavepoint();
            } else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();

                // ☆ ->
                doCommit(status);
            } else if (isFailEarlyOnGlobalRollbackOnly()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                    "Transaction silently rolled back because it has been marked as rollback-only");
            }
        } catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            //  触发回调
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        } catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {
                doRollbackOnCommitException(status, ex);
            } else {
                //  触发回调
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        } catch (RuntimeException | Error ex) {
            if (!beforeCompletionInvoked) {
                //  触发回调
                triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
        }

        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        try {
            triggerAfterCommit(status);
        } finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    } finally {

        // 恢复被挂起的资源到当前线程中
        cleanupAfterCompletion(status);
    }
}

在上面的方法中else if (status.isNewTransaction())这个分支判断了当前事务是不是我正在执行的这个方法(如userService的insert方法)新建的,如果是,那么我就可以现在提交。如果不是,有可能是传播下来的,那我现在还不能提交,需要到创建这个事务的方法中去提交。


3.7. doCommit

最关键的方法就是doCommit()方法:

DataSourceTransactionManager#doCommit

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Override
protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    }
    try {

        // 提交
        con.commit();
    } catch (SQLException ex) {
        throw translateException("JDBC commit", ex);
    }
}

由此整个事务顺利结束?并没有,还需要看一下之前有没有事务被挂起,如果有的话需要将挂起的事务恢复(Resume)。

doCommit()的最后有一个cleanupAfterCompletion()方法如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.clear();
    }
    if (status.isNewTransaction()) {
        // 这里会去关闭数据库连接
        doCleanupAfterCompletion(status.getTransaction());
    }

    // 恢复被挂起的资源到当前线程中
    if (status.getSuspendedResources() != null) {
        if (status.isDebug()) {
            logger.debug("Resuming suspended transaction after completion of inner transaction");
        }
        Object transaction = (status.hasTransaction() ? status.getTransaction() : null);

        // 恢复
        resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    }
}
  • status.isNewTransaction():判断当前的事务是不是我这个方法(如insert())中新建的,如果是新建的,还需要将连接释放掉。如果当前事务是传播下来的,那就只要提交就可以了。具体可见doCleanupAfterCompletion()方法

还剩一个事务失败的回滚流程completeTransactionAfterThrowing(txInfo, ex);

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                         "] after exception: " + ex);
        }

        // transactionAttribute的实现类为[RuleBasedTransactionAttribute], 父类为DefaultTransactionAttribute
        if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
            try {
                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
            } catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            } catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by rollback exception", ex);
                throw ex2;
            }
        } else {
            // We don't roll back on this exception.
            // Will still roll back if TransactionStatus.isRollbackOnly() is true.
            try {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            } catch (TransactionSystemException ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            } catch (RuntimeException | Error ex2) {
                logger.error("Application exception overridden by commit exception", ex);
                throw ex2;
            }
        }
    }
}

txInfo.transactionAttribute.rollbackOn(ex)这里是在判断是否符合我们在@Transactional()中的rollbackFor参数对应的异常。

对应的匹配逻辑:

RuleBasedTransactionAttribute#rollbackOn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean rollbackOn(Throwable ex) {
		RollbackRuleAttribute winner = null;
		int deepest = Integer.MAX_VALUE;

		if (this.rollbackRules != null) {
			// 遍历所有的RollbackRuleAttribute, 判断现在抛出的异常ex是否匹配RollbackRuleAttribute中指定的异常类型的子类或本身
			for (RollbackRuleAttribute rule : this.rollbackRules) {
				int depth = rule.getDepth(ex);
				if (depth >= 0 && depth < deepest) {
					deepest = depth;
					winner = rule;
				}
			}
		}

		// User superclass behavior (rollback on unchecked) if no rule matches.
		if (winner == null) {
			return super.rollbackOn(ex);
		}

		// ex所匹配的RollbackRuleAttribute, 可能是NoRollbackRuleAttribute, 如果是匹配的NoRollbackRuleAttribute,
		// 那就表示现在这个异常ex 不用刚回滚
		return !(winner instanceof NoRollbackRuleAttribute);
	}

3.8. rollback

1
2
3
4
5
6
7
8
9
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    processRollback(defStatus, false);
}

拿到defStatus直接调用下面的processRollback()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
        boolean unexpectedRollback = unexpected;

        try {
            // 回滚之前先触发下面的回调
            triggerBeforeCompletion(status);

            if (status.hasSavepoint()) {
                
                // 回滚到上一个savepoint位置
                status.rollbackToHeldSavepoint();
            } else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }

                // ☆ ->
                // 如果当前执行的方法是新开的事务, 则直接回滚
                doRollback(status);
            } else {
                // Participating in larger transaction
                // 如果当前执行的方法已经有一个事务了, 而当前执行的方法抛出了异常, 则要判断整个事务到底要不要回滚, 看具体配置
                if (status.hasTransaction()) {

                    // 如果一个事务中有两个方法,第二个方法抛异常了,那么第二个方法就相当于执行失败需要回滚,
                    // 如果globalRollbackOnParticipationFailure为true,那么第一个方法在没有抛异常的情况下也要回滚
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                        }

                        // 直接将rollbackOnly设置到ConnectionHolder中去, 表示整个事务的sql都要回滚
                        doSetRollbackOnly(status);
                    } else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                        }
                    }
                } else {
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
                // Unexpected rollback only matters here if we're asked to fail early
                if (!isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = false;
                }
            }
        } catch (RuntimeException | Error ex) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }

        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

        // Raise UnexpectedRollbackException if we had a global rollback-only marker
        if (unexpectedRollback) {
            throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
        }
    } finally {
        cleanupAfterCompletion(status);
    }
}

上面的代码中:status.rollbackToHeldSavepoint();


3.8.1. SavePoint

对应着MySQL中的savepoint,使用如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
begin;

insert into user(name,sex,age) values('test1','男',18);
insert into user(name,sex,age) values('test2','男',18);
insert into user(name,sex,age) values('test3','男',18);

savepoint x;

insert into user(name,sex,age) values('test4','男',18);
insert into user(name,sex,age) values('test5','男',18);

rollback to x;


commit;

此时只会回滚test4test5


3.9. doRollback

调用connection的rollback()方法,直接回滚。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Override
protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.rollback();
    } catch (SQLException ex) {
        throw translateException("JDBC rollback", ex);
    }
}

相关内容

0%