【Seata】来自阿里开源分布式事务框架-Seata的原理

InterviewCoder

# 【Seata】来自阿里开源分布式事务框架 - Seata 的原理

首先 Seata 客户端启动一般分为以下几个流程:

  1. 自动加载 Bean 属性和配置信息
  2. 初始化 TM
  3. 初始化 RM
  4. 初始化分布式事务客户端完成,完成代理数据库配置
  5. 连接 TC (Seata 服务端),注册 RM 和 TM
  6. 开启全局事务

在这篇源码的讲解中,我们主要以 AT 模式为主导,官网也是主推 AT 模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务 (Seata) 四大模式详解,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT 模式主要分为两个阶段:

一阶段:

  • 解析 SQL,获取 SQL 类型(CRUD)、表信息、条件 (where) 等相关信息
  • 查询前镜像 (改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据
  • 执行业务 SQL,更新数据
  • 查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据
  • 插入回滚日志,将前后镜像数据以及业务 SQL 等信息,组织成一条回滚日志记录,插入到 undo Log 表中
  • 提交前,向 TC 注册分支,申请全局锁
  • 本地事务提交,业务数据的更细腻和生成的 undoLog 一起提交
  • 将本地事务提交的结果通知给 TC

二阶段:

如果 TC 收到的是回滚请求

  • 开启本地事务,通过 XID 和 BranchID 查找到对应的 undo Log 记录
  • 根据 undoLog 中的前镜像和业务 SQL 的相关信息生成并执行回滚语句
  • 提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给 TC

如果没问题,执行提交操作

  • 收到 TC 分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给 TC
  • 异步任务阶段的分支提交请求删除 undoLog 中记录

img

# 源码入口

接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source 即可,下载完成之后,通过 IDEA 打开项目。

img

源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seataspring-alibaba-seata ,我们在回想一下,我们开启事务的时候,是不是添加过一个 @GlobalTransactional 的注解,这个注解就是我们入手的一个点,我们在 spring.factories 中看到有一个 GlobalTransactionAutoConfiguration ,这个就是我们需要关注的点,也就是我们源码的入口

img

GlobalTransactionAutoConfiguration 中我们找到一个用 Bean 注入的方法 globalTransactionScanner ,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的 Bean

img

这里给大家展示了当前 GlobalTransactionScanner 的类关系图,其中我们现在继承了 Aop 的 AbstractAutoProxyCreator 类型,在这其中有一个重点方法,这个方法就是判断 Bean 对象是否需要代理,是否需要增强。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
@EnableConfigurationProperties(SeataProperties.class)
public class GlobalTransactionAutoConfiguration {

//全局事务扫描器
@Bean
public GlobalTransactionScanner globalTransactionScanner() {

String applicationName = applicationContext.getEnvironment()
.getProperty("spring.application.name");

String txServiceGroup = seataProperties.getTxServiceGroup();

if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
seataProperties.setTxServiceGroup(txServiceGroup);
}
// 构建全局扫描器,传入参数:应用名、事务分组名,失败处理器
return new GlobalTransactionScanner(applicationName, txServiceGroup);
}

}

在这其中我们要关心的是 GlobalTransactionScanner 这个类型,这个类型扫描 @GlobalTransactional 注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个 GlobalTransactionScanner 类,看看里面具体是做了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
/**
* The type Global transaction scanner.
* 全局事务扫描器
* @author slievrly
*/
public class GlobalTransactionScanner
//AbstractAutoProxyCreator AOP动态代理 增强Bean
extends AbstractAutoProxyCreator
/**
* ConfigurationChangeListener: 监听器基准接口
* InitializingBean: Bean初始化
* ApplicationContextAware: Spring容器
* DisposableBean: Spring 容器销毁
*/
implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {

private final String applicationId;//服务名
private final String txServiceGroup;//事务分组

private void initClient() {
//启动日志
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
//初始化TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}

if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();

}

@Override
public void afterPropertiesSet() {
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
return;
}
if (initialized.compareAndSet(false, true)) {
initClient();
}
}

private void initClient() {
//启动日志
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Initializing Global Transaction Clients ... ");
}
//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " +
"please change your default configuration as soon as possible " +
"and we don't recommend you to use default tx-service-group's value provided by seata",
DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
}

//检查应用名以及事务分组名,为空抛出异常IllegalArgumentException
if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
//初始化TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
//初始化RM
RMClient.init(applicationId, txServiceGroup);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}

if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global Transaction Clients are initialized. ");
}
registerSpringShutdownHook();

}

//代理增强,Spring 所有的Bean都会经过这个方法
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// do checkers
//检查bean和beanName
if (!doCheckers(bean, beanName)) {
return bean;
}

try {
//加锁防止并发
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
//检查是否为TCC模式
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
//如果启用useTccFence 失败 ,则初始化TCC清理任务
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
//如果是,添加TCC拦截器
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
//不是TCC
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

//判断是否有相关事务注解,如果没有不进行代理
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}

//发现存在全局事务注解标注的Bean对象,添加拦截器
if (globalTransactionalInterceptor == null) {
//添加拦截器
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}

LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
//检查是否为代理对象
if (!AopUtils.isAopProxy(bean)) {
//不是代理对象,调用父级
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
//是代理对象,反射获取代理类中已经存在的拦截器组合,然后添加到这个集合中
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
int pos;
for (Advisor avr : advisor) {
// Find the position based on the advisor's order, and add to advisors by pos
pos = findAddSeataAdvisorPosition(advised, avr);
advised.addAdvisor(pos, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}


}

InitializingBean`:中实现了一个 `afterPropertiesSet()`方法,在这个方法中,调用了`initClient()

AbstractAutoProxyCreator :APO 动态代理,在之前的的 Nacos 和 Sentiel 中都有这个代理类,AOP 在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过 AOP 进行增强,在这个类中,我们需要关注有一个 wrapIfNecessary() 方法, 这个方法主要是判断被代理的 bean 或者类是否需要代理增强,在这个方法中会调用 GlobalTransactionalInterceptor.invoke() 进行带来增强。

具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {

public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;
this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
DEFAULT_DISABLE_GLOBAL_TRANSACTION);
this.order =
ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);
degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK,
DEFAULT_TM_DEGRADE_CHECK);
if (degradeCheck) {
ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);
degradeCheckPeriod = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);
degradeCheckAllowTimes = ConfigurationFactory.getInstance()
.getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);
EVENT_BUS.register(this);
if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {
startDegradeCheck();
}
}
this.initDefaultGlobalTransactionTimeout();
}

@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
//获取执行的方法
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
//获取GlobalTransactional(全局事务)、GlobalLock(全局锁)元数据
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
//GlobalLock会将本地事务的执行纳入Seata分布式事务的管理,共同竞争全局锁
//保证全局事务在执行的时候,本地事务不可以操作全局事务的记录
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);//获取全局锁
boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
//执行全局事务
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
//执行全局锁
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}

}

具体流程图如下所示:

img

# 核心源码

在上面我们讲解到 GlobalTransactionalInterceptor 作为全局事务拦截器,一旦执行拦截,就会进入 invoke 方法,其中,我们会做 @GlobalTransactional 注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction 全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction 到底是如何执行全局事务的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}

//获取事务名称,默认获取方法名
public String name() {
String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}

/**
* 解析GlobalTransation注解属性,封装对对象
* @return
*/
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
//获取超时时间,默认60秒
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}

//构建事务信息对象
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);//超时时间
transactionInfo.setName(name());//事务名称
transactionInfo.setPropagation(aspectTransactional.getPropagation());//事务传播
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());//校验或占用全局锁重试间隔
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());//校验或占用全局锁重试次数
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
//其他构建信息
for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getRollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
//执行异常
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
succeed = false;
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}

在这里我们,主要关注一个重点方法 execute() ,这个方法主要用来执行事务的具体流程:

  • 获取事务信息
  • 执行全局事务
  • 发生异常全局回滚,各个数据通过 UndoLog 进行事务补偿
  • 全局事务提交
  • 清除所有资源

这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
//获取事务信息
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
//获取当前事务,主要获取XID
GlobalTransaction tx = GlobalTransactionContext.getCurrent();

// 1.2 Handle the transaction propagation.
//根据配置的不同事务传播行为,执行不同的逻辑
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}

// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
//如果当前事务为空,创建一个新的事务
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}

// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
//开始执行全局事务
beginTransaction(txInfo, tx);

Object rs;
try {
// Do Your Business
// 执行当前业务逻辑
//1、在TC注册当前分支事务,TC会在branch_table中插入一条分支事务数据
//2、执行本地update语句,并在执行前后查询数据状态,并把数据前后镜像存入到undo_log中
//3、远程调用其他应用,远程应用接收到XID,也会注册分支事务,写入branch_table以及本地undo_log表
//4、会在lock_table表中插入全局锁数据(一个分支一条)
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
//发生异常全局回滚,每个事务通过undo_log表进行事务补偿
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}

// 4. everything is fine, commit.
//全局提交
commitTransaction(tx);

return rs;
} finally {
//5. clear
//清理所有资源
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}

这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 beginTransaction() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
// 向TC发起请求,采用模板模式
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
try {
triggerBeforeBegin();
//对TC发起请求
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);

}
}

在来关注其中,向 TC 发起请求的 tx.begin() 方法,而调用 begin() 方法的类为: DefaultGlobalTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void begin(int timeout, String name) throws TransactionException {
//判断调用者是否为TM
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
}
return;
}
assertXIDNull();
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
//获取XID
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
//绑定XID
RootContext.bind(xid);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction [{}]", xid);
}
}

再来看一下 transactionManager.begin() 方法,这个时候使用的是 DefaultTransactionManager.begin 默认的事务管理者,来获取 XID,传入事务相关的信息 ,最好 TC 返回对应的全局事务 XID,它调用的是 DefaultTransactionManager.begin() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
//发送请求得到响应
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
//返回XID
return response.getXid();
}

在这里我们需要关注一个 syncCall ,在这里采用的是 Netty 通讯方式

1
2
3
4
5
6
7
8
private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
try {
// 通过Netty发送请求
return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}

具体图解如下:

img

在这里我们需要重点了解 GlobalTransactionScanner 这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的 Bean,并做 AOP 增强。

  • ApplicationContextAware :继承这个类型以后,需要实现其方法 setApplicationContext() ,当 Spring 启动完成以后,会自动调用这个类型,将 ApplicationContextbean ,也就是说, GlobalTransactionScanner 能够很自然的使用 Spring 环境
  • InitializingBean : 继承这个接口,需要实现 afterPropertiesSet() ,但凡是继承这个接口的类,在初始化的时候,当所有的 properties 设置完成以后,会执行这个方法
  • DisposableBean : 这个类,实现了一个 destroy() 这个方法是在销毁的时候去调用
  • AbstractAutoProxyCreator : 这个类是 Spring 实现 AOP 的一种方式,本质上是一个 BeanPostProcessor ,在 Bean 初始化至去年,调用内部 createProxy() ,创建一个 Bean 的 AOP 代理 Bean 并返回,对 Bean 进行增强。

# Seata 数据源代理

在上面的环节中,我们讲解了 Seata AT 模式 2PC 的执行流程,那么现在我们就来带大家了解一下关于 AT 数据源代理的信息,这也是 AT 模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。

首先 AT 模式的核心主要分为一下两个

  • 开启全局事务,获取全局锁。
  • 解析 SQL 并写入 undoLog 中。

关于第一点我们已经分析清楚了,第二点就是关于 AT 模式如何解析 SQL 并写入 undoLog 中,但是在这之前,我们需要知道 Seata 是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行 SQL 提交和操作,但是由于 Seata 对数据源进行了代理,所以 SQL 的解析和 undoLog 的操作,是在数据源代理中进行完成的。

数据源代理是 Seata 中一个非常重要的知识点,在分布式事务运行过程中,undoLog 的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解 Seata 的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式 (代理) 去实现。

同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的 jar 找到一个 SeataAutoConfiguration 类,我们在里面找到一个 SeataDataSourceBeanPostProcessor() ,这个就是我们数据源代理的入口方法

img

我们进入 SeataDataSourceBeanPostProcessor 类里面,发现继承了一个 BeanPostProcessor , 这个接口我们应该很熟悉,这个是 Sprng 的拓展接口,所有的 Bean 对象,都有进入两个方法 postProcessAfterInitialization()postProcessBeforeInitialization() 这两个方法都是由 BeanPostProcessor 提供的,这两个方法,一个是初始化之前执行 Before 。一个是在初始化之后执行 After ,主要用来对比我们的的 Bean 是否为数据源代理对象。

在这里我们需要关注到一个 postProcessAfterInitialization.proxyDataSource() 方法,这个里面

1
2
3
4
5
6
7
8
9
private Object proxyDataSource(Object originBean) {
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean);
if (this.useJdkProxy) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
} else {
return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean));
}

}

这里有一个 DataSourceProxy 代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有 ConnectionProxy 连接对象、 StatementProxyPreparedStatementProxy SQL 执行对象,这些都被 Seata 进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行 Seata 的业务逻辑,生成 undoLog,全局事务的开启,事务的提交回滚等操作

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
DataSourceProxy` 具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是`init()
public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {

private String resourceGroupId;

private void init(DataSource dataSource, String resourceGroupId) {
//资源组ID,默认是“default”这个默认值
this.resourceGroupId = resourceGroupId;
try (Connection connection = dataSource.getConnection()) {
//根据原始数据源得到JDBC连接和数据库类型
jdbcUrl = connection.getMetaData().getURL();
dbType = JdbcUtils.getDbType(jdbcUrl);
if (JdbcConstants.ORACLE.equals(dbType)) {
userName = connection.getMetaData().getUserName();
} else if (JdbcConstants.MARIADB.equals(dbType)) {
dbType = JdbcConstants.MYSQL;
}
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
initResourceId();
DefaultResourceManager.get().registerResource(this);
if (ENABLE_TABLE_META_CHECKER_ENABLE) {
//如果配置开关打开,会定时在线程池不断更新表的元数据缓存信息
tableMetaExecutor.scheduleAtFixedRate(() -> {
try (Connection connection = dataSource.getConnection()) {
TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
.refresh(connection, DataSourceProxy.this.getResourceId());
} catch (Exception ignore) {
}
}, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
}

//Set the default branch type to 'AT' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
}

从上面我们可以看出,他主要做了以下几点的增强:

  1. 给每个数据源标识资源组 ID
  2. 如果打开配置,会有一个定时线程池定时更新表的元数据信息并缓存到本地
  3. 生成代理连接 ConnectionProxy 对象

在这三个增强功能里面,第三个是最重要的,在 AT 模式里面,会自动记录 undoLog,资源锁定,都是通过 ConnectionProxy 完成的,除此之外 DataSrouceProxy 重写了一个方法 getConnection ,因为这里返回的是一个 ConnectionProxy ,而不是原生的 Connection

1
2
3
4
5
6
7
8
9
10
11
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}

@Override
public ConnectionProxy getConnection(String username, String password) throws SQLException {
Connection targetConnection = targetDataSource.getConnection(username, password);
return new ConnectionProxy(this, targetConnection);
}

# ConnectionProxy

1
ConnectionProxy` 继承 `AbstractConnectionProxy` ,在这个父类中有很多公用的方法,在这个父类有 `PreparedStatementProxy` 、`StatementProxy` 、`DataSourceProxy

img

所以我们需要先来看一下 AbstractConnectionProxy ,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxyStatementProxy ,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public Statement createStatement() throws SQLException {
//调用真实连接对象获取Statement对象
Statement targetStatement = getTargetConnection().createStatement();
//创建Statement的代理
return new StatementProxy(this, targetStatement);
}

@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
//获取数据库类型 mysql/oracle
String dbType = getDbType();
// support oracle 10.2+
PreparedStatement targetPreparedStatement = null;
//如果是AT模式且开启全局事务
if (BranchType.AT == RootContext.getBranchType()) {
List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
//获取表的元数据
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
//得到表的主键列名
String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
}
}
}
if (targetPreparedStatement == null) {
targetPreparedStatement = getTargetConnection().prepareStatement(sql);
}
//创建PreparedStatementProxy代理
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}

在这两个代理对象中,都用到了以下几个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public ResultSet executeQuery(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
}

@Override
public int executeUpdate(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
}

@Override
public boolean execute(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute((String) args[0]), sql);
}

在这些方法中都调用了 ExecuteTemplate.execute() ,所以我们就看一下在 ExecuteTemplate 类中具体是做了什么操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class ExecuteTemplate {

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
//如果没有全局锁,并且不是AT模式,直接执行SQL
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}

//得到数据库类型- mysql/oracle
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//sqlRecognizers 为SQL语句的解析器,获取执行的SQL,通过它可以获得SQL语句表名、相关的列名、类型等信息,最后解析出对应的SQL表达式
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
//如果seata没有找到合适的SQL语句解析器,那么便创建简单执行器PlainExecutor
//PlainExecutor直接使用原生的Statment对象执行SQL
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
//新增
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
//修改
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//删除
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//加锁
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
//插入加锁
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
case JdbcConstants.MARIADB:
executor =
new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
//原生
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
//批量处理SQL语句
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
//执行
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}

}

ExecuteTemplate 就一个 execute() ,Seata 将 SQL 执行委托给不同的执行器 (模板),Seata 提供了 6 种执行器也就是我们代码 case 中( INSERTUPDATEDELETESELECT_FOR_UPDATE , INSERT_ON_DUPLICATE_UPDATE ),这些执行器的父类都是 AbstractDMLBaseExecutor

  • UpdateExecutor : 执行 update 语句
  • InsertExecutor : 执行 insert 语句
  • DeleteExecutor : 执行 delete 语句
  • SelectForUpdateExecutor : 执行 select for update 语句
  • PlainExecutor : 执行普通查询语句
  • MultiExecutor : 复合执行器,在一条 SQL 语句中执行多条语句

关系图如下:

img

然后我们找到 rs = executor.execute(args); 最终执行的方法,找到最顶级的父类 BaseTransactionalExecutor.execute()

1
2
3
4
5
6
7
8
9
10
11
@Override
public T execute(Object... args) throws Throwable {
String xid = RootContext.getXID();
if (xid != null) {
//获取XID
statementProxy.getConnectionProxy().bind(xid);
}
//设置全局锁
statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
return doExecute(args);
}

在根据 doExecute(args); 找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()

1
2
3
4
5
6
7
8
9
10
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
//是否自动提交
if (connectionProxy.getAutoCommit()) {
return executeAutoCommitTrue(args);
} else {
return executeAutoCommitFalse(args);
}
}

对于数据库而言,本身都是自动提交的,所以我们进入 executeAutoCommitTrue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
//设置为手动提交
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
//调用手动提交方法,得到分支执行的最终结果
T result = executeAutoCommitFalse(args);
//执行提交
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}

connectionProxy.changeAutoCommit()`方法,修改为手动提交后,我们看来最关键的代码`executeAutoCommitFalse()
protected T executeAutoCommitFalse(Object[] args) throws Exception {
if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
throw new NotSupportYetException("multi pk only support mysql!");
}
//获取前镜像
TableRecords beforeImage = beforeImage();
//执行具体业务
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
//获取执行数量
int updateCount = statementProxy.getUpdateCount();
//判断如果执行数量大于0
if (updateCount > 0) {
//获取后镜像
TableRecords afterImage = afterImage(beforeImage);
//暂存到undolog中,在Commit的时候保存到数据库
prepareUndoLog(beforeImage, afterImage);
}
return result;
}

我们再回到 executeAutoCommitTrue 中,去看看提交做了哪些操作 connectionProxy.commit();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void commit() throws SQLException {
try {
lockRetryPolicy.execute(() -> {
//具体执行
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}

进入到 doCommit()

1
2
3
4
5
6
7
8
9
10
private void doCommit() throws SQLException {
//判断是否存在全局事务
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}

作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void processGlobalTransactionCommit() throws SQLException {
try {
//注册分支事务
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//写入数据库undolog
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//执行原生提交 一阶段提交
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}

其中 register() 方法就是注册分支事务的方法,同时还会将 undoLog 写入数据库和执行提交等操作

1
2
3
4
5
6
7
8
9
10
//注册分支事务,生成分支事务ID
private void register() throws TransactionException {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
//注册分支事务
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
null, context.getXid(), context.getApplicationData(), context.buildLockKeys());
context.setBranchId(branchId);
}

然后我们在回到 processGlobalTransactionCommit 中,看看写入数据库中的 flushUndoLogs()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}
//获取XID
String xid = connectionContext.getXid();
//获取分支ID
long branchId = connectionContext.getBranchId();

BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}

CompressorType compressorType = CompressorType.NONE;
if (needCompress(undoLogContent)) {
compressorType = ROLLBACK_INFO_COMPRESS_TYPE;
undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent);
}
//写入数据库具体位置
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection());
}

具体写入方法,此时我们使用的是 MySql,所以执行的是 MySql 实现类 MySQLUndoLogManager.insertUndoLogWithNormal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
protected void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
Connection conn) throws SQLException {
insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn);
}

//具体写入操作
private void insertUndoLog(String xid, long branchId, String rollbackCtx, byte[] undoLogContent,
State state, Connection conn) throws SQLException {
try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) {
pst.setLong(1, branchId);
pst.setString(2, xid);
pst.setString(3, rollbackCtx);
pst.setBytes(4, undoLogContent);
pst.setInt(5, state.getValue());
pst.executeUpdate();
} catch (Exception e) {
if (!(e instanceof SQLException)) {
e = new SQLException(e);
}
throw (SQLException) e;
}
}

具体流程如下所示:

img

# Seata 服务端

我们找到 Server.java 这里就是启动入口,在这个入口中找到协调者,因为 TC 整体的操作就是协调整体的全局事务

1
2
//默认协调者
DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);

DefaultCoordinator 类中我们找到 一个 doGlobalBegin 这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit 和全局回滚 doGlobalRollback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    //处理全局事务
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)throws TransactionException {
//响应客户端xid
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
}
}

//处理全局提交
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.commit(request.getXid()));
}

//处理全局回滚
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
RpcContext rpcContext) throws TransactionException {
MDC.put(RootContext.MDC_KEY_XID, request.getXid());
response.setGlobalStatus(core.rollback(request.getXid()));
}

在这里我们首先关注 doGlobalBegincore.begin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
//创建全局事务Session
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());

//为Session重添加回调监听,SessionHolder.getRootSessionManager() 获取一个全局Session管理器DataBaseSessionManager
//观察者设计模式,创建DataBaseSessionManager
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

//全局事务开始
session.begin();

// transaction start event
MetricsPublisher.postSessionDoingEvent(session, false);

return session.getXid();
}

然后我们在来看一下 SessionHolder.getRootSessionManager()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Gets root session manager.
* 获取一个全局Session管理器
* @return the root session manager
*/
public static SessionManager getRootSessionManager() {
if (ROOT_SESSION_MANAGER == null) {
throw new ShouldNeverHappenException("SessionManager is NOT init!");
}
return ROOT_SESSION_MANAGER;
}

public static void init(String mode) {
if (StringUtils.isBlank(mode)) {
mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE,
CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE));
}
StoreMode storeMode = StoreMode.get(mode);
//判断Seata模式,当前为DB
if (StoreMode.DB.equals(storeMode)) {
//通过SPI机制读取SessionManager接口实现类,读取的META-INF.services目录,在通过反射机制创建对象DataBaseSessionManager
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
........
}
}

在这里他其实读取的是 DB 模式下 io.seata.server.session.SessionManager 文件的内容

img

我们在回到 begin 方法中,去查看 session.begin()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void begin() throws TransactionException {
//声明全局事务开始
this.status = GlobalStatus.Begin;
//开始时间
this.beginTime = System.currentTimeMillis();
//激活全局事务
this.active = true;
//将SessionManager放入到集合中,调用onBegin方法
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
//调用父级抽象类的方法
lifecycleListener.onBegin(this);
}
}

这里我们来看一下 onBegin() 方法,调用的是父级的方法,在这其中我们要关注 addGlobalSession() 方法,但是要注意,这里我们用的是 db 模式所以调用的是 db 模式的 DateBaseSessionManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void onBegin(GlobalSession globalSession) throws TransactionException {
//这里调用的是DateBaseSessionManager
addGlobalSession(globalSession);
}

@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
//写入session
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
} else {
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
if (!ret) {
throw new StoreException("addGlobalSession failed.");
}
}
}

然后在看查询其中关键的方法 DataBaseTransactionStoreManager.writeSession()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
//第一次进入是写入 会进入当前方法
//全局添加
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//全局修改
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//全局删除
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
//分支添加
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
//分支更新
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
//分支移除
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}

我们就看第一次进去的方法 logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
int index = 1;
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(index++, globalTransactionDO.getXid());
ps.setLong(index++, globalTransactionDO.getTransactionId());
ps.setInt(index++, globalTransactionDO.getStatus());
ps.setString(index++, globalTransactionDO.getApplicationId());
ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ?
transactionName.substring(0, transactionNameColumnSize) :
transactionName;
ps.setString(index++, transactionName);
ps.setInt(index++, globalTransactionDO.getTimeout());
ps.setLong(index++, globalTransactionDO.getBeginTime());
ps.setString(index++, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}

在这里有一个 GlobalTransactionDO 对象,里面有 xid、transactionId 等等,到这里是不是就很熟悉了、

img

还记得我们第一次使用 Seata 的时候会创建三张表

  1. branch_table 分支事务表
  2. global_table 全局事务表
  3. lock_table 全局锁表

而这里就是对应我们的 global_table 表,其他两个也是差不多,都是一样的操作

img
流程图如下:
img

# 总结

完整流程图:

img

对于 Seata 源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以 Debug,对于 Seata AT 模式,我们主要掌握的核心点是

  • 如何获取全局锁、开启全局事务
  • 解析 SQL 并写入 undolog

# 关于我

Brath 是一个热爱技术的 Java 程序猿,公众号「InterviewCoder」定期分享有趣有料的精品原创文章!

InterviewCoder

非常感谢各位人才能看到这里,原创不易,文章如果有帮助可以关注、点赞、分享或评论,这都是对我的莫大支持!

评论