Seata 分支事务

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文先来介绍 Seata 中分支事务的整体实现思想,其他 Seata 相关文章均收录于 <Seata系列文章>中。

Branch Type

我们已经知道在 Seata 中, 分支事务分 AT 模式和 TCC 模式, 那么, Seata 是怎么区分出 AT 模式和 TCC 模式的呢? 这也借助了 Spring 的 AOP 特性, 我们在 TM 中介绍的 GlobalTransactionalInterceptor 实际上只负责 AT 模式, TCC 模式是另一套拦截器实现, 而这两种拦截器的注入, 全都是在 GlobalTransactionScanner 中进行的, 也就是说, 我们的 Spring 项目要将 GlobalTransactionScanner 注册为 Bean, 因为其继承自 AbstractAutoProxyCreator, Spring 在处理 AOP 过程时, 就会自动将 GlobalTransactionScanner 执行。接下来, 我们看看整个 Seata 的入口 GlobalTransactionScanner 的核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware, DisposableBean, BeanPostProcessor {
// 只保留核心代码...

// 判断是否需要嵌入 Seata 的 AOP 代码
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (disableGlobalTransaction) {
return bean;
}
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy, 实际上是根据 TCC 注解 TwoPhaseBusinessAction + RPC 协议类型进行判断, TCC 目前只支持一些特定的协议
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
// 识别出 TCC 模式, 使用 TCC 拦截器
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
// 判断是不是 AT 模式, 通过注解 GlobalTransactional GlobalLock
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
// 识别出AT 模式, 使用前面提到的 GlobalTransactionalInterceptor
if (interceptor == null) {
interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
}
}

LOGGER.info(
"Bean[" + bean.getClass().getName() + "] with name [" + beanName + "] would use interceptor ["
+ interceptor.getClass().getName() + "]");
if (!AopUtils.isAopProxy(bean)) {
// 如果该类不由 Spring 管控, 则无能为力
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
// 如果是由 Spring 管控, 将 TCC 拦截器 或者 AT 拦截器注入
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, new Object[]{interceptor});
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
// 判断是不是 AT 模式, 通过注解 GlobalTransactional GlobalLock
private boolean existsAnnotation(Class<?>[] classes) {
if (classes != null && classes.length > 0) {
for (Class clazz : classes) {
if (clazz == null) {
continue;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
GlobalTransactional trxAnno = method.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}

GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
if (lockAnno != null) {
return true;
}
}
}
}
return false;
}
// 替换默认的数据库连接源, 改为 AT 模式的数据源代理
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [" + beanName + "]");
}
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (null != m) {
return m.invoke(dataSourceProxy, args);
} else {
return method.invoke(bean, args);
}
});
}
return bean;
}
}

整个项目是从这个 GlobalTransactionScanner 作为起点接入 Seata 的, 一般来说 SpringBoot 会用 @Configuration 类将其注册为 @Bean, 原生 Spring 项目需要在在 XML 中将其注册为 Bean,这一点我们从官方的 Sample 中就能发现。

1
2
3
4
5
6
7
8
9
10
11
12
13
// SpringBoot
@Configuration
public class SeataAutoConfig {
/**
* init global transaction scanner
*
* @Return: GlobalTransactionScanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner(){
return new GlobalTransactionScanner("account-gts-seata-example", "my_test_tx_group");
}
}

Spring 配置如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<!--省略其他内容-->
<bean class="io.seata.spring.annotation.GlobalTransactionScanner">
<constructor-arg value="dubbo-demo-app"/>
<constructor-arg value="my_test_tx_group"/>
</bean>

</beans>

好了, 想必大家应该已经清楚 Seata 中是如何发现 TCC 模式和 AT 模式的了, 我们总结一下:

  1. 扫描 Spring 代理的类
  2. 如果发现带有 TCC 的注解, 并且 RPC 协议满足条件, 那么走 TCC 拦截器
  3. 如果发现带有 AT 的注解, 那么走 AT 拦截器
  4. 否则, 什么都不干

接下来我们, 分别看一下这两种模式各自都是如何运作起来的。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/middleware/seata/bt/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。