【Spring源码】Spring数据库事务源码解析

准备个需求

之前读其他无关数据库源码的时候,只是简单的依赖了 spring-boot-starter-web,现在由于需要数据库的参与,所以需要加上 MySQL 的驱动,以及一个最简单的 jdbc 框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>

application.yml 配置数据库连接信息。

1
2
3
4
5
spring:
datasource:
url: jdbc:mysql://192.168.1.152:3306/spring_trans?useUnicode=true&characterEncoding=utf8&autoReconnect=true&allowMultiQueries=true
username: root
password: root

一个接收需要插入数据库的请求:

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class UserController {

@Autowired
private UserService userService;

@PostMapping("users")
public void add(@RequestParam("name") String name, @RequestParam("orgName") String orgName) {
userService.add(name, orgName);
}

}

由于示例,就使用最简单的例子,插入一个用户以及他所属的组织机构,一对一的关系。使用 JdbcTemplate 直接插入 SQL 语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface UserService {
void add(String name, String orgName);
}
@Service
public class UserServiceImpl implements UserService {

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
@Transactional
public void add(String name, String orgName) {
String userId = UUID.randomUUID().toString();
jdbcTemplate.update("insert into user_info(user_uuid, user_name)\n" +
"values (?,?);", userId, name);
// 使用一个RuntimeEx来触发事务回滚
int i = 1 / 0;
String orgUUID = UUID.randomUUID().toString();
jdbcTemplate.update("insert into org_info(org_uuid, org_name, user_uuid)\n" +
"VALUES (?, ?, ?);", orgUUID, orgName, userId);
}

}

应用事务

  • 编程式注解:使用 @Transactional 修饰的类或者方法,具有事务特性。(SpringBoot 时代一般用这个)
  • 声明式事务:通过定义配置文件,需要定义一套基于公司内部的方法命名规范。如以下的配置,update 以及 insert 为名的方法,使用事务,不然使用只读事务。
1
2
3
4
5
6
7
8
9
10
11
<tx:advice id="advice" transaction-manager="transactionManager">  
<tx:attributes>
<tx:method name="update*" propagation="REQUIRED" read-only="false" rollback-for="java.lang.Exception"/>
<tx:method name="insert" propagation="REQUIRED" read-only="false"/>
</tx:attributes>
</tx:advice>

<aop:config>
<aop:pointcut id="testService" expression="execution (* com.liweidan.service.MyBatisService.*(..))"/>
<aop:advisor advice-ref="advice" pointcut-ref="testService"/>
</aop:config>

在上面例子中的 UserServiceImpl 是使用编程式事务处理的,当没有加上 @Transactional 时,在 int i = 1 / 0; 处发生异常时,上面已经插入的用户信息并不会被回滚,这就不符合我们日常的业务需求了。OK,从这里开始将要开始看看 Spring 是如何应用事务到我们的代码上的。

依赖结构

OK,简单看看我们依赖了 spring-boot-starter-jdbc 都依赖了些什么东西。从图中可以看到,spring-jdbcspring-tx 以及 HicariCP 都进来了,spring-jdbc 模块也还好,封装了 JdbcTemplate 等一些偏向于原生 JDBC 的操作,spring-tx 就是此次需要说的重点了,事务管理模块,主要管理 Spring 自己定义的一些事务模型。HicariCP 则是一个高性能的数据库连接池。

基于AOP

我记得很清楚的,《从零开始架构WEB系统》中说到,可以使用 AOP 方式切入项目,获取 Connection 保存在线程的 ThreadLocal 中,在调用插入更新的时候,取出来开启事务,方法执行结束后判定是否是正常执行,如果遇到异常了,就 roolBack 插入的数据,否则 commit。那我们现在大概可以带着这个思路来看 Spring 源码。

自动配置

上一篇我们说过,SpringBoot 会自动扫描导入 org.springframework.boot.autoconfigure 包下的所有的配置,这次,事务的自动配置是 org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration。因为导入项目已经有 PlatformTransactionManager 类,所以该配置将被自动执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// TransactionAutoConfiguration.java: 
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(PlatformTransactionManager.class)
@AutoConfigureAfter({ JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class, Neo4jDataAutoConfiguration.class })
@EnableConfigurationProperties(TransactionProperties.class)
public class TransactionAutoConfiguration {

@Bean
@ConditionalOnMissingBean
public TransactionManagerCustomizers platformTransactionManagerCustomizers(
ObjectProvider<PlatformTransactionManagerCustomizer<?>> customizers) {
return new TransactionManagerCustomizers(customizers.orderedStream().collect(Collectors.toList()));
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(ReactiveTransactionManager.class)
public TransactionalOperator transactionalOperator(ReactiveTransactionManager transactionManager) {
return TransactionalOperator.create(transactionManager);
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnSingleCandidate(PlatformTransactionManager.class)
public static class TransactionTemplateConfiguration {

@Bean
@ConditionalOnMissingBean(TransactionOperations.class)
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(TransactionManager.class)
@ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
public static class EnableTransactionManagementConfiguration {

@Configuration(proxyBeanMethods = false)
// 重点是这个注解,导入了些东西
@EnableTransactionManagement(proxyTargetClass = false)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false",
matchIfMissing = false)
public static class JdkDynamicAutoProxyConfiguration {

}

@Configuration(proxyBeanMethods = false)
@EnableTransactionManagement(proxyTargetClass = true)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true",
matchIfMissing = true)
public static class CglibAutoProxyConfiguration {

}

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// EnableTransactionManagement.java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 导入一个Selector
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {

/**
* Indicate whether subclass-based (CGLIB) proxies are to be created ({@code true}) as
* opposed to standard Java interface-based proxies ({@code false}). The default is
* {@code false}. <strong>Applicable only if {@link #mode()} is set to
* {@link AdviceMode#PROXY}</strong>.
* <p>Note that setting this attribute to {@code true} will affect <em>all</em>
* Spring-managed beans requiring proxying, not just those marked with
* {@code @Transactional}. For example, other beans marked with Spring's
* {@code @Async} annotation will be upgraded to subclass proxying at the same
* time. This approach has no negative impact in practice unless one is explicitly
* expecting one type of proxy vs another, e.g. in tests.
*/
boolean proxyTargetClass() default false;

/**
* Indicate how transactional advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Transactional} annotation on such a method within a local call will be
* ignored since Spring's interceptor does not even kick in for such a runtime
* scenario. For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
*/
AdviceMode mode() default AdviceMode.PROXY;

/**
* Indicate the ordering of the execution of the transaction advisor
* when multiple advices are applied at a specific joinpoint.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE}.
*/
int order() default Ordered.LOWEST_PRECEDENCE;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// TransactionManagementConfigurationSelector.java
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {

/**
* Returns {@link ProxyTransactionManagementConfiguration} or
* {@code AspectJ(Jta)TransactionManagementConfiguration} for {@code PROXY}
* and {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()},
* respectively.
*/
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}

private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void processImports(ConfigurationClass configClass, SourceClass currentSourceClass,
Collection<SourceClass> importCandidates, boolean checkForCircularImports) {

if (importCandidates.isEmpty()) {
return;
}

if (checkForCircularImports && isChainedImportOnStack(configClass)) {
this.problemReporter.error(new CircularImportProblem(configClass, this.importStack));
}
else {
this.importStack.push(configClass);
try {
for (SourceClass candidate : importCandidates) {
if (candidate.isAssignable(ImportSelector.class)) {
// Candidate class is an ImportSelector -> delegate to it to determine imports
Class<?> candidateClass = candidate.loadClass();
ImportSelector selector = ParserStrategyUtils.instantiateClass(candidateClass, ImportSelector.class,
this.environment, this.resourceLoader, this.registry);
if (selector instanceof DeferredImportSelector) {
this.deferredImportSelectorHandler.handle(configClass, (DeferredImportSelector) selector);
}
else {
// 在导入其他配置类的时候会使用上面的 selectImports 函数导入配置类
// 然后重新解析配置
String[] importClassNames = selector.selectImports(currentSourceClass.getMetadata());
Collection<SourceClass> importSourceClasses = asSourceClasses(importClassNames);
processImports(configClass, currentSourceClass, importSourceClasses, false);
}
// .....
}
}
}
}

来到这里,配置类的东西就已经被注册到 BeanFactory 中去,那么据之前的 SpringAOP 的介绍,BeanFactory 会在每次初始化 Bean 的时候,调用 BeanPostProcessor#postProcessAfterInitialization 这个函数来创建真实 Bean 的代理对象。 然后这件事情就交给了 AOP模块InfrastructureAdvisorAutoProxyCreator 来实现包装代理模式。

篇幅关系我只放关键的代码片段,快速过一下前面的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final @Nullable Object[] args)
throws BeanCreationException {
// ......
Object exposedObject = bean;
try {
populateBean(beanName, mbd, instanceWrapper);
// 包装实际Bean
exposedObject = initializeBean(beanName, exposedObject, mbd);
}
catch (Throwable ex) {
if (ex instanceof BeanCreationException && beanName.equals(((BeanCreationException) ex).getBeanName())) {
throw (BeanCreationException) ex;
}
else {
throw new BeanCreationException(
mbd.getResourceDescription(), beanName, "Initialization of bean failed", ex);
}
}
// ......
}
}
// initializeBean
protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) {
if (System.getSecurityManager() != null) {
AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
invokeAwareMethods(beanName, bean);
return null;
}, getAccessControlContext());
}
else {
invokeAwareMethods(beanName, bean);
}

Object wrappedBean = bean;
if (mbd == null !mbd.isSynthetic()) {
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
}

try {
invokeInitMethods(beanName, wrappedBean, mbd);
}
catch (Throwable ex) {
throw new BeanCreationException(
(mbd != null ? mbd.getResourceDescription() : null),
beanName, "Invocation of init method failed", ex);
}
if (mbd == null !mbd.isSynthetic()) {
// 创建完成,使用集成的后处理器处理对应的Bean
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
}

return wrappedBean;
}
// applyBeanPostProcessorsAfterInitialization使用后处理器包装Bean
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
throws BeansException {

Object result = existingBean;
for (BeanPostProcessor processor : getBeanPostProcessors()) {
// 轮询到InfrastructureAdvisorAutoProxyCreator或AnnotationAwareAspectJAutoProxyCreator进行代理处理
Object current = processor.postProcessAfterInitialization(result, beanName);
if (current == null) {
return result;
}
result = current;
}
return result;
}

注:使用 InfrastructureAdvisorAutoProxyCreator 还是 AnnotationAwareAspectJAutoProxyCreator 取决于当前项目有没有依赖 Spring-AOP 模块,如果依赖 Spring-AOP 模块则会交给 AnnotationAwareAspectJAutoProxyCreator 进行处理。 两者的区别是:InfrastructureAdvisorAutoProxyCreator 不负责织入用户自定义的 AOP 类

然后,AbstractAdvisorAutoProxyCreator (两者的共同父类) 会找到所有的 Advisor 实现来织入这个实际对象的方法中。 因为在解析配置类的时候,BeanFactoryTransactionAttributeSourceAdvisor 这个类已经被注册到 BeanFactoryBeanDefinitionMap 中,所以第一个 Bean(大概率都是 Spring 自己需要的 Bean)创建的时候,就会触发上面自动配置中,创建 BeanFactoryTransactionAttributeSourceAdvisor 的配置类,调用方法进行相对应的创建。 然后又走了一遍 getBean 啊,doCreateBean 啊….创建 BeanFactoryTransactionAttributeSourceAdvisor。 好了,走完创建 BeanFactoryTransactionAttributeSourceAdvisor 的过程。我们现在需要看看怎么被织入。

织入方法

至于判定的方法,之前已经说过基于 @PointCut 切入,那么这个解析方式,应该很容易想到了吧,就是 类注解 + 方法级别注解 判定是否要切入当前方法。 上面两个织入类,无论怎么样都会来到这个函数织入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
return bean;
}
if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
return bean;
}
if (isInfrastructureClass(bean.getClass()) shouldSkip(bean.getClass(), beanName)) {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}

// 获取当前Bean所需要的AOP拦截器链
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
// 进入创建代理对象
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}

this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}

进入配置并使用 ProxyFactory 来创建代理对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected Object createProxy(Class<?> beanClass, @Nullable String beanName,
@Nullable Object[] specificInterceptors, TargetSource targetSource) {

if (this.beanFactory instanceof ConfigurableListableBeanFactory) {
AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);
}

ProxyFactory proxyFactory = new ProxyFactory();
proxyFactory.copyFrom(this);

if (!proxyFactory.isProxyTargetClass()) {
if (shouldProxyTargetClass(beanClass, beanName)) {
proxyFactory.setProxyTargetClass(true);
}
else {
evaluateProxyInterfaces(beanClass, proxyFactory);
}
}

Advisor[] advisors = buildAdvisors(beanName, specificInterceptors);
proxyFactory.addAdvisors(advisors);
proxyFactory.setTargetSource(targetSource);
customizeProxyFactory(proxyFactory);

proxyFactory.setFrozen(this.freezeProxy);
if (advisorsPreFiltered()) {
proxyFactory.setPreFiltered(true);
}

return proxyFactory.getProxy(getProxyClassLoader());
}

进入 getProxy 方法,通过配置来指定配置工厂(jdk代理cglib),然后调用两个工厂都有的 getProxy 来获取代理实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Object getProxy(@Nullable ClassLoader classLoader) {
return createAopProxy().getProxy(classLoader);
}
protected final synchronized AopProxy createAopProxy() {
if (!this.active) {
activate();
}
return getAopProxyFactory().createAopProxy(this);
}
// DefaultAopProxyFactory.java
@Override
public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
if (config.isOptimize() config.isProxyTargetClass() hasNoUserSuppliedProxyInterfaces(config)) {
Class<?> targetClass = config.getTargetClass();
if (targetClass == null) {
throw new AopConfigException("TargetSource cannot determine target class: " +
"Either an interface or a target is required for proxy creation.");
}
if (targetClass.isInterface() Proxy.isProxyClass(targetClass)) {
return new JdkDynamicAopProxy(config);
}
// 虽然我的类是带有接口的,但是来到这里,使用cglib进行创建
return new ObjenesisCglibAopProxy(config);
}
else {
return new JdkDynamicAopProxy(config);
}
}

SpringBoot代理模式

上面跑出来的小问题,为啥带有接口还是使用了 cglib 创建,跟我之前读 Spring-AOP 的时候是不一样的结果。 这里就要说到 SpringBoot 的自动创建配置了:

1
2
3
4
5
6
7
// META-INF/additional-spring-configuration-metadata.json
{
"name": "spring.aop.proxy-target-class",
"type": "java.lang.Boolean",
"description": "Whether subclass-based (CGLIB) proxies are to be created (true), as opposed to standard Java interface-based proxies (false).",
"defaultValue": true
},

然而搜遍谷歌 Spring 项目,只看到 Spring 成员说了一句:

意思是 Cglib 代理能够减少类转换异常。

织入拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
public Object getProxy(@Nullable ClassLoader classLoader) {
// 省略部分代码
// Configure CGLIB Enhancer...
Enhancer enhancer = createEnhancer();
if (classLoader != null) {
enhancer.setClassLoader(classLoader);
if (classLoader instanceof SmartClassLoader &&
((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
enhancer.setUseCache(false);
}
}
enhancer.setSuperclass(proxySuperClass);
enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
enhancer.setStrategy(new ClassLoaderAwareGeneratorStrategy(classLoader));

// 在Callback这里即会织入BeanFactoryTransactionAttributeSourceAdvisor中的拦截器:
// TransactionInterceptor
Callback[] callbacks = getCallbacks(rootClass);
Class<?>[] types = new Class<?>[callbacks.length];
for (int x = 0; x < types.length; x++) {
types[x] = callbacks[x].getClass();
}
// fixedInterceptorMap only populated at this point, after getCallbacks call above
enhancer.setCallbackFilter(new ProxyCallbackFilter(
this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
enhancer.setCallbackTypes(types);

// Generate the proxy class and create a proxy instance.
return createProxyClassAndInstance(enhancer, callbacks);
}
catch (CodeGenerationException IllegalArgumentException ex) {
throw new AopConfigException("Could not generate CGLIB subclass of " + this.advised.getTargetClass() +
": Common causes of this problem include using a final class or a non-visible class",
ex);
}
catch (Throwable ex) {
// TargetSource.getTarget() failed
throw new AopConfigException("Unexpected AOP exception", ex);
}
}

拦截器织入完成接下来就需要看看怎么被调用的了。


插入事务

真男人要直接,直接在 Controller 打断点,进入业务方法之前,会先进入 TransactionInterceptor#invoke

1
2
3
4
5
6
7
8
9
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取目标类
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// 开启事务执行方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

呃,代码还是挺长的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 获取所有配置的事务属性元数据类,如果为null,代表默认事务(默认是autoCommit=true)
TransactionAttributeSource tas = getTransactionAttributeSource();
// 从缓存中命中当前方法的事务配置
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器
final TransactionManager tm = determineTransactionManager(txAttr);

// Reactive环境走这里,先跳过
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

// 获取Spring提供的统一事务管理器
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建或者加入当前事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// 执行目标方法,如果有下一个AOP链则接下去执行
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 遇到异常,回滚数据库事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}

// 响应式数据库操作 也是先跳过
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}

// 提交数据库事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// CallbackPreferringPlatformTransactionManager 可回调的管理器
// 也是响应式的内容
else {
final ThrowableHolder throwableHolder = new ThrowableHolder();

// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});

// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}

那下面就一步一步来看。

TransactionAttribute事务属性

这是一个定义了事务级别以及其他比如超时信息的事务元信息类,当前获取的是一个 RuleBasedTransactionAttribute 基于一定规则的事务属性,定义了必须在抛出 Runtime 异常的时候回滚数据库。 父级是 TransactionDefinition,这可是一个元老级别的类了,定义了事务管理器常见所需的隔离级别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public interface TransactionDefinition {

// ------------------------------ Spring事务隔离级别 ------------------------------
// 如果当前没有事务,则创建一个新事务
int PROPAGATION_REQUIRED = 0;

// 如果当前没有事务则以无事务的方式运行
int PROPAGATION_SUPPORTS = 1;

// 当前有事务就加入没有就抛异常
int PROPAGATION_MANDATORY = 2;

// 阻塞当前事务,创建一个子事务
int PROPAGATION_REQUIRES_NEW = 3;

// 一直以无事务的状态运行
int PROPAGATION_NOT_SUPPORTED = 4;

// 如果当前有事务则抛出异常
int PROPAGATION_NEVER = 5;

// 如果存在事务则在嵌套事务中运行
// 行为类似于 PROPAGATION_REQUIRED
int PROPAGATION_NESTED = 6;

// ------------------------------ jdbc原生事务隔离级别 ------------------------------

// 使用数据库默认的级别
int ISOLATION_DEFAULT = -1;

// 未提交读
int ISOLATION_READ_UNCOMMITTED = 1; // same as java.sql.Connection.TRANSACTION_READ_UNCOMMITTED;

// 不可重复读
int ISOLATION_READ_COMMITTED = 2; // same as java.sql.Connection.TRANSACTION_READ_COMMITTED;


// 可重复读
int ISOLATION_REPEATABLE_READ = 4; // same as java.sql.Connection.TRANSACTION_REPEATABLE_READ;

// 串行执行事务
int ISOLATION_SERIALIZABLE = 8; // same as java.sql.Connection.TRANSACTION_SERIALIZABLE;


/**
* Use the default timeout of the underlying transaction system,
* or none if timeouts are not supported.
*/
int TIMEOUT_DEFAULT = -1;


/**
* Return the propagation behavior.
* <p>Must return one of the {@code PROPAGATION_XXX} constants
* defined on {@link TransactionDefinition this interface}.
* <p>The default is {@link #PROPAGATION_REQUIRED}.
* @return the propagation behavior
* @see #PROPAGATION_REQUIRED
* @see org.springframework.transaction.support.TransactionSynchronizationManager#isActualTransactionActive()
*/
default int getPropagationBehavior() {
return PROPAGATION_REQUIRED;
}

/**
* Return the isolation level.
* <p>Must return one of the {@code ISOLATION_XXX} constants defined on
* {@link TransactionDefinition this interface}. Those constants are designed
* to match the values of the same constants on {@link java.sql.Connection}.
* <p>Exclusively designed for use with {@link #PROPAGATION_REQUIRED} or
* {@link #PROPAGATION_REQUIRES_NEW} since it only applies to newly started
* transactions. Consider switching the "validateExistingTransactions" flag to
* "true" on your transaction manager if you'd like isolation level declarations
* to get rejected when participating in an existing transaction with a different
* isolation level.
* <p>The default is {@link #ISOLATION_DEFAULT}. Note that a transaction manager
* that does not support custom isolation levels will throw an exception when
* given any other level than {@link #ISOLATION_DEFAULT}.
* @return the isolation level
* @see #ISOLATION_DEFAULT
* @see org.springframework.transaction.support.AbstractPlatformTransactionManager#setValidateExistingTransaction
*/
default int getIsolationLevel() {
return ISOLATION_DEFAULT;
}

/**
* Return the transaction timeout.
* <p>Must return a number of seconds, or {@link #TIMEOUT_DEFAULT}.
* <p>Exclusively designed for use with {@link #PROPAGATION_REQUIRED} or
* {@link #PROPAGATION_REQUIRES_NEW} since it only applies to newly started
* transactions.
* <p>Note that a transaction manager that does not support timeouts will throw
* an exception when given any other timeout than {@link #TIMEOUT_DEFAULT}.
* <p>The default is {@link #TIMEOUT_DEFAULT}.
* @return the transaction timeout
*/
default int getTimeout() {
return TIMEOUT_DEFAULT;
}

/**
* Return whether to optimize as a read-only transaction.
* <p>The read-only flag applies to any transaction context, whether backed
* by an actual resource transaction ({@link #PROPAGATION_REQUIRED}/
* {@link #PROPAGATION_REQUIRES_NEW}) or operating non-transactionally at
* the resource level ({@link #PROPAGATION_SUPPORTS}). In the latter case,
* the flag will only apply to managed resources within the application,
* such as a Hibernate {@code Session}.
* <p>This just serves as a hint for the actual transaction subsystem;
* it will <i>not necessarily</i> cause failure of write access attempts.
* A transaction manager which cannot interpret the read-only hint will
* <i>not</i> throw an exception when asked for a read-only transaction.
* @return {@code true} if the transaction is to be optimized as read-only
* ({@code false} by default)
* @see org.springframework.transaction.support.TransactionSynchronization#beforeCommit(boolean)
* @see org.springframework.transaction.support.TransactionSynchronizationManager#isCurrentTransactionReadOnly()
*/
default boolean isReadOnly() {
return false;
}

/**
* Return the name of this transaction. Can be {@code null}.
* <p>This will be used as the transaction name to be shown in a
* transaction monitor, if applicable (for example, WebLogic's).
* <p>In case of Spring's declarative transactions, the exposed name will be
* the {@code fully-qualified class name + "." + method name} (by default).
* @return the name of this transaction ({@code null} by default}
* @see org.springframework.transaction.interceptor.TransactionAspectSupport
* @see org.springframework.transaction.support.TransactionSynchronizationManager#getCurrentTransactionName()
*/
@Nullable
default String getName() {
return null;
}


// Static builder methods

/**
* Return an unmodifiable {@code TransactionDefinition} with defaults.
* <p>For customization purposes, use the modifiable
* {@link org.springframework.transaction.support.DefaultTransactionDefinition}
* instead.
* @since 5.2
*/
static TransactionDefinition withDefaults() {
return StaticTransactionDefinition.INSTANCE;
}

}

那这个东西用在哪里呢,后面就会看到处理方式了,先放一放。

事务管理器

这一步没做什么,就当是单纯获得了一个事务管理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Nullable
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
if (txAttr == null this.beanFactory == null) {
return getTransactionManager();
}

String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
TransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}

开启数据库事务

可以看到,传递了一个事务定义元数据来创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// 处理名字
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 管理当前开启的事务管理器
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

首先,先根据当前的事务管理级别创建 TransactionStatus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {

// 获取 TransactionDefinition,如果配置中没有定义则获取默认的事务定义.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

// 获取事务状态对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();

if (isExistingTransaction(transaction)) {
// 当前存在事务,根据不同等级返回 TransactionStatus.
return handleExistingTransaction(def, transaction, debugEnabled);
}

// 超时的话抛出异常.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}

// 不存在事务的时候,需要根据上面的事务定义来继续行为.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
// 这个等级是没有事务即抛出异常
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 创建新的 TransactionStatus 然后开启数据库事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建一个新的DefaultTransactionStatus贯穿整个事务
DefaultTransactionStatus status = newTransactionStatus(
def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启新事务
doBegin(transaction, def);
// 设置激活程序中的事务管理器,设置只读属性、事务名字、隔离级别等信息
prepareSynchronization(status, def);
return status;
}
catch (RuntimeException Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}

OK,接下来看看 doBegin 怎么管理 Connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 开启事务管理器
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
if (!txObject.hasConnectionHolder()
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 通过数据库连接池拿到 Connection
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 使用ConnectionHolder管理当前的连接
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

// Connection属性设置(之前的隔离级别、是否只读)
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());

// 修改Connection的只读状态
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

// 如果当前是只读事务则发送 SET TRANSACTION READ ONLY 给数据库切换事务状态
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// 把ConnectionHolder绑定到TransactionSynchronizationManager
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

开始执行

为了防止需要拖拉到上面去阅读,我先把之前执行到哪个地方放在这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 获取所有配置的事务属性元数据类,如果为null,代表默认事务(默认是autoCommit=true)
TransactionAttributeSource tas = getTransactionAttributeSource();
// 从缓存中命中当前方法的事务配置
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器
final TransactionManager tm = determineTransactionManager(txAttr);

// Reactive环境走这里,先跳过
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

// 获取Spring提供的统一事务管理器
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建或者加入当前事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// 执行目标方法,如果有下一个AOP链则接下去执行
// ---------------> 已经准备好了事务信息,可以开始执行整个AOP链条了
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 遇到异常,回滚数据库事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清空当前事务,如果当前是嵌套事务,管理器会重新拿到之前的事务
cleanupTransactionInfo(txInfo);
}

// 响应式数据库操作 也是先跳过
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}

// 提交数据库事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// CallbackPreferringPlatformTransactionManager 可回调的管理器
// 也是响应式的内容
else {
// 省略响应式内容.........
}
}

正常提交

拿到事务管理器,提交事务:

1
2
3
4
5
6
7
8
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

需要经过判断来做是否真正提交:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;

// 根据用户TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();设置
// 是否不让提交(比如校验没有通过,但是没有抛出异常) 如果设置了则回滚数据库
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}

// 这里是判断全局事务中配置了只能在全局回滚并且当前已经出现错误,如果当前是内部事务,会标记业务层事务出现错误
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}

// 处理提交事务
processCommit(defStatus);
}

处理提交,可以说,我们项目中使用到提交事务生命周期都在这里被调用到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;

try {
boolean unexpectedRollback = false;
// 下面三个是调用相对应的生命周期
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;

// SavePoint 先不看
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 当前是一个新的事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 真正做提交的地方
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}

// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 调用事务生命周期的后处理器
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}

拿到 Connection 调用 Commit 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}

回滚数据库

业务代码加一个除以 0 的计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class UserServiceImpl implements UserService {

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
@Transactional
public void add(String name, String orgName) {
String userId = UUID.randomUUID().toString();
jdbcTemplate.update("insert into user_info(user_uuid, user_name)\n" +
"values (?,?);", userId, name);
int i = 1 / 0;
String orgUUID = UUID.randomUUID().toString();
jdbcTemplate.update("insert into org_info(org_uuid, org_name, user_uuid)\n" +
"VALUES (?, ?, ?);", orgUUID, orgName, userId);
}

}

那么出现了异常了,这一步会来到 try-catch 中的 catch:

1
2
3
4
5
6
7
8
9
10
11
12
try {
// 执行目标方法,如果有下一个AOP链则接下去执行
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 遇到异常,回滚数据库事务
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}

这个是简单了很多了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 判断当前抛出的异常是否需要回滚数据库
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
// 如果不会滚,继续提交
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}

好吧来到了处理方法:

1
2
3
4
5
6
7
8
9
10
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}

那么其实跟提交的套路是一样的,触发一系列的生命周期:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
triggerBeforeCompletion(status);

if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}

拿到 Connection 进行回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}