# 【Seata】来自阿里开源分布式事务框架 - Seata 的原理
首先 Seata
客户端启动一般分为以下几个流程:
自动加载 Bean 属性和配置信息
初始化 TM
初始化 RM
初始化分布式事务客户端完成,完成代理数据库配置
连接 TC (Seata 服务端),注册 RM 和 TM
开启全局事务
在这篇源码的讲解中,我们主要以 AT 模式为主导,官网也是主推 AT 模式,我们在上篇的文章中也讲解过,感兴趣的小伙伴可以去看一看分布式事务 (Seata) 四大模式详解 ,在官网中也提供了对应的流程地址:https://seata.io/zh-cn/docs/dev/mode/at-mode.html ,在这里我们只是做一些简单的介绍,AT 模式主要分为两个阶段:
一阶段:
解析 SQL,获取 SQL 类型(CRUD)、表信息、条件 (where) 等相关信息
查询前镜像 (改变之前的数据),根据解析得到的条件信息,生成查询语句,定位数据
执行业务 SQL,更新数据
查询后镜像(改变后的数据),根据前镜像的结果,通过主键都给你为数据
插入回滚日志,将前后镜像数据以及业务 SQL 等信息,组织成一条回滚日志记录,插入到 undo Log 表中
提交前,向 TC 注册分支,申请全局锁
本地事务提交,业务数据的更细腻和生成的 undoLog 一起提交
将本地事务提交的结果通知给 TC
二阶段:
如果 TC 收到的是回滚请求
开启本地事务,通过 XID 和 BranchID 查找到对应的 undo Log 记录
根据 undoLog 中的前镜像和业务 SQL 的相关信息生成并执行回滚语句
提交本地事务,将本地事务的执行结果(分支事务回滚的信息)通知给 TC
如果没问题,执行提交操作
收到 TC 分支提交请求,将请求放入到一个异步任务的队列中,马上返回提交成功的结果给 TC
异步任务阶段的分支提交请求删除 undoLog 中记录
# 源码入口
接下来,我们就需要从官网中去下载源码,下载地址:https://seata.io/zh-cn/blog/download.html,选择 source
即可,下载完成之后,通过 IDEA 打开项目。
源码下载下来之后,我们应该如何去找入口呢?首先我们需要找到对应引入的 Seata
包 spring-alibaba-seata
,我们在回想一下,我们开启事务的时候,是不是添加过一个 @GlobalTransactional
的注解,这个注解就是我们入手的一个点,我们在 spring.factories
中看到有一个 GlobalTransactionAutoConfiguration
,这个就是我们需要关注的点,也就是我们源码的入口
在 GlobalTransactionAutoConfiguration
中我们找到一个用 Bean 注入的方法 globalTransactionScanner
,这个就是全局事务扫描器,这个类型主要负责加载配置,注入相关的 Bean
这里给大家展示了当前 GlobalTransactionScanner 的类关系图,其中我们现在继承了 Aop 的 AbstractAutoProxyCreator 类型,在这其中有一个重点方法,这个方法就是判断 Bean 对象是否需要代理,是否需要增强。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Configuration @EnableConfigurationProperties(SeataProperties.class) public class GlobalTransactionAutoConfiguration { @Bean public GlobalTransactionScanner globalTransactionScanner () { String applicationName = applicationContext.getEnvironment() .getProperty("spring.application.name" ); String txServiceGroup = seataProperties.getTxServiceGroup(); if (StringUtils.isEmpty(txServiceGroup)) { txServiceGroup = applicationName + "-fescar-service-group" ; seataProperties.setTxServiceGroup(txServiceGroup); } return new GlobalTransactionScanner (applicationName, txServiceGroup); } }
在这其中我们要关心的是 GlobalTransactionScanner
这个类型,这个类型扫描 @GlobalTransactional
注解,并对代理方法进行拦截增强事务的功能。我们就从源码中搜索这个 GlobalTransactionScanner
类,看看里面具体是做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener , InitializingBean, ApplicationContextAware, DisposableBean { private final String applicationId; private final String txServiceGroup; private void initClient () { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... " ); } if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don't recommend you to use default tx-service-group's value provided by seata" , DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException (String.format("applicationId: %s, txServiceGroup: %s" , applicationId, txServiceGroup)); } TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]" , applicationId, txServiceGroup); } RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]" , applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. " ); } registerSpringShutdownHook(); } @Override public void afterPropertiesSet () { if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled." ); } ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this ); return ; } if (initialized.compareAndSet(false , true )) { initClient(); } } private void initClient () { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... " ); } if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) { LOGGER.warn("the default value of seata.tx-service-group: {} has already changed to {} since Seata 1.5, " + "please change your default configuration as soon as possible " + "and we don't recommend you to use default tx-service-group's value provided by seata" , DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException (String.format("applicationId: %s, txServiceGroup: %s" , applicationId, txServiceGroup)); } TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]" , applicationId, txServiceGroup); } RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]" , applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. " ); } registerSpringShutdownHook(); } @Override protected Object wrapIfNecessary (Object bean, String beanName, Object cacheKey) { if (!doCheckers(bean, beanName)) { return bean; } try { synchronized (PROXYED_SET) { if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null ; if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); interceptor = new TccActionInterceptor (TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); if (!existsAnnotation(new Class []{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (globalTransactionalInterceptor == null ) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor (failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]" , bean.getClass().getName(), beanName, interceptor.getClass().getName()); if (!AopUtils.isAopProxy(bean)) { bean = super .wrapIfNecessary(bean, beanName, cacheKey); } else { AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null , null , null )); int pos; for (Advisor avr : advisor) { pos = findAddSeataAdvisorPosition(advised, avr); advised.addAdvisor(pos, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException (exx); } } } InitializingBean`:中实现了一个 `afterPropertiesSet()`方法,在这个方法中,调用了`initClient()
AbstractAutoProxyCreator
:APO 动态代理,在之前的的 Nacos 和 Sentiel 中都有这个代理类,AOP 在我们越往深入学习,在学习源码的会见到的越来越多,越来越重要,很多相关代理,都是通过 AOP 进行增强,在这个类中,我们需要关注有一个 wrapIfNecessary()
方法, 这个方法主要是判断被代理的 bean 或者类是否需要代理增强,在这个方法中会调用 GlobalTransactionalInterceptor.invoke()
进行带来增强。
具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener , MethodInterceptor, SeataInterceptor { public GlobalTransactionalInterceptor (FailureHandler failureHandler) { this .failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler; this .disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION); this .order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER); degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK); if (degradeCheck) { ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this ); degradeCheckPeriod = ConfigurationFactory.getInstance() .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD); degradeCheckAllowTimes = ConfigurationFactory.getInstance() .getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES); EVENT_BUS.register(this ); if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0 ) { startDegradeCheck(); } } this .initDefaultGlobalTransactionTimeout(); } @Override public Object invoke (final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null ; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null || this .aspectTransactional != null ) { AspectTransactional transactional; if (globalTransactionalAnnotation != null ) { transactional = new AspectTransactional (globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes()); } else { transactional = this .aspectTransactional; } return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null ) { return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); } }
具体流程图如下所示:
# 核心源码
在上面我们讲解到 GlobalTransactionalInterceptor
作为全局事务拦截器,一旦执行拦截,就会进入 invoke 方法,其中,我们会做 @GlobalTransactional
注解的判断,如果有这个注解的存在,会执行全局事务和全局锁,再执行全局事务的时候会调用 handleGlobalTransaction
全局事务处理器,获取事务信息,那我们接下来就来看一下 GlobalTransactionalInterceptor.handleGlobalTransaction
到底是如何执行全局事务的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 Object handleGlobalTransaction (final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true ; try { return transactionalTemplate.execute(new TransactionalExecutor () { @Override public Object execute () throws Throwable { return methodInvocation.proceed(); } public String name () { String name = aspectTransactional.getName(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } @Override public TransactionInfo getTransactionInfo () { int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } TransactionInfo transactionInfo = new TransactionInfo (); transactionInfo.setTimeOut(timeout); transactionInfo.setName(name()); transactionInfo.setPropagation(aspectTransactional.getPropagation()); transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval()); transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes()); Set<RollbackRule> rollbackRules = new LinkedHashSet <>(); for (Class<?> rbRule : aspectTransactional.getRollbackFor()) { rollbackRules.add(new RollbackRule (rbRule)); } for (String rbRule : aspectTransactional.getRollbackForClassName()) { rollbackRules.add(new RollbackRule (rbRule)); } for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) { rollbackRules.add(new NoRollbackRule (rbRule)); } for (String rbRule : aspectTransactional.getNoRollbackForClassName()) { rollbackRules.add(new NoRollbackRule (rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { TransactionalExecutor.Code code = e.getCode(); switch (code) { case RollbackDone: throw e.getOriginalException(); case BeginFailure: succeed = false ; failureHandler.onBeginFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case CommitFailure: succeed = false ; failureHandler.onCommitFailure(e.getTransaction(), e.getCause()); throw e.getCause(); case RollbackFailure: failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); case RollbackRetrying: failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException()); throw e.getOriginalException(); default : throw new ShouldNeverHappenException (String.format("Unknown TransactionalExecutor.Code: %s" , code)); } } finally { if (degradeCheck) { EVENT_BUS.post(new DegradeCheckEvent (succeed)); } } }
在这里我们,主要关注一个重点方法 execute()
,这个方法主要用来执行事务的具体流程:
获取事务信息
执行全局事务
发生异常全局回滚,各个数据通过 UndoLog 进行事务补偿
全局事务提交
清除所有资源
这个位置也是一个非常核心的一个位置,因为我们所有的业务进来以后都会去走这个位置,具体源码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 public Object execute (TransactionalExecutor business) throws Throwable { TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null ) { throw new ShouldNeverHappenException ("transactionInfo does not exist" ); } GlobalTransaction tx = GlobalTransactionContext.getCurrent(); Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null ; try { switch (propagation) { case NOT_SUPPORTED: if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } return business.execute(); case REQUIRES_NEW: if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } break ; case SUPPORTS: if (notExistingTransaction(tx)) { return business.execute(); } break ; case REQUIRED: break ; case NEVER: if (existingTransaction(tx)) { throw new TransactionException ( String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s" , tx.getXid())); } else { return business.execute(); } case MANDATORY: if (notExistingTransaction(tx)) { throw new TransactionException ("No existing transaction found for transaction marked with propagation 'mandatory'" ); } break ; default : throw new TransactionException ("Not Supported Propagation:" + propagation); } if (tx == null ) { tx = GlobalTransactionContext.createNew(); } GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { beginTransaction(txInfo, tx); Object rs; try { rs = business.execute(); } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } commitTransaction(tx); return rs; } finally { resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { if (suspendedResourcesHolder != null ) { tx.resume(suspendedResourcesHolder); } } }
这其中的第三步和第四步其实在向 TC(Seata-Server)发起全局事务的提交或者回滚,在这里我们首先关注执行全局事务的 beginTransaction()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 private void beginTransaction (TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeBegin(); tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor .ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } }
在来关注其中,向 TC 发起请求的 tx.begin()
方法,而调用 begin()
方法的类为: DefaultGlobalTransaction
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public void begin (int timeout, String name) throws TransactionException { if (role != GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]" , xid); } return ; } assertXIDNull(); String currentXid = RootContext.getXID(); if (currentXid != null ) { throw new IllegalStateException ("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid); } xid = transactionManager.begin(null , null , name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]" , xid); } }
再来看一下 transactionManager.begin()
方法,这个时候使用的是 DefaultTransactionManager.begin
默认的事务管理者,来获取 XID,传入事务相关的信息 ,最好 TC 返回对应的全局事务 XID,它调用的是 DefaultTransactionManager.begin()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public String begin (String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest (); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException (TransactionExceptionCode.BeginFailed, response.getMsg()); } return response.getXid(); }
在这里我们需要关注一个 syncCall
,在这里采用的是 Netty 通讯方式
1 2 3 4 5 6 7 8 private AbstractTransactionResponse syncCall (AbstractTransactionRequest request) throws TransactionException { try { return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request); } catch (TimeoutException toe) { throw new TmTransactionException (TransactionExceptionCode.IO, "RPC timeout" , toe); } }
具体图解如下:
在这里我们需要重点了解 GlobalTransactionScanner
这个类型,在这个类型中继承了一些接口和抽象类,这个类主要作用就是扫描有注解的 Bean,并做 AOP 增强。
ApplicationContextAware
:继承这个类型以后,需要实现其方法 setApplicationContext()
,当 Spring 启动完成以后,会自动调用这个类型,将 ApplicationContext
给 bean
,也就是说, GlobalTransactionScanner
能够很自然的使用 Spring 环境
InitializingBean
: 继承这个接口,需要实现 afterPropertiesSet()
,但凡是继承这个接口的类,在初始化的时候,当所有的 properties
设置完成以后,会执行这个方法
DisposableBean
: 这个类,实现了一个 destroy()
这个方法是在销毁的时候去调用
AbstractAutoProxyCreator
: 这个类是 Spring 实现 AOP 的一种方式,本质上是一个 BeanPostProcessor
,在 Bean 初始化至去年,调用内部 createProxy()
,创建一个 Bean 的 AOP 代理 Bean 并返回,对 Bean 进行增强。
# Seata 数据源代理
在上面的环节中,我们讲解了 Seata AT 模式 2PC 的执行流程,那么现在我们就来带大家了解一下关于 AT 数据源代理的信息,这也是 AT 模式中非常关键的一个重要知识点,大家可以拿起小本子,记下来。
首先 AT 模式的核心主要分为一下两个
开启全局事务,获取全局锁。
解析 SQL 并写入 undoLog 中。
关于第一点我们已经分析清楚了,第二点就是关于 AT 模式如何解析 SQL 并写入 undoLog 中,但是在这之前,我们需要知道 Seata 是如何选择数据源,并进行数据源代理的。虽然全局事务拦截成功后最终还是执行了业务方法进行 SQL 提交和操作,但是由于 Seata 对数据源进行了代理,所以 SQL 的解析和 undoLog 的操作,是在数据源代理中进行完成的。
数据源代理是 Seata 中一个非常重要的知识点,在分布式事务 运行过程中,undoLog 的记录、资源的锁定,用户都是无感知的,因为这些操作都是数据源的代理中完成了,恰恰是这样,我们才要去了解,这样不仅有利于我们了解 Seata 的核心操作,还能对以后源码阅读有所帮助,因为其实很多底层代码都会去使用这样用户无感知的方式 (代理) 去实现。
同样,我们在之前的寻找源码入口的时候,通过我们项目中引入的 jar 找到一个 SeataAutoConfiguration
类,我们在里面找到一个 SeataDataSourceBeanPostProcessor()
,这个就是我们数据源代理的入口方法
我们进入 SeataDataSourceBeanPostProcessor
类里面,发现继承了一个 BeanPostProcessor
, 这个接口我们应该很熟悉,这个是 Sprng 的拓展接口,所有的 Bean 对象,都有进入两个方法 postProcessAfterInitialization()
和 postProcessBeforeInitialization()
这两个方法都是由 BeanPostProcessor
提供的,这两个方法,一个是初始化之前执行 Before
。一个是在初始化之后执行 After
,主要用来对比我们的的 Bean 是否为数据源代理对象。
在这里我们需要关注到一个 postProcessAfterInitialization.proxyDataSource()
方法,这个里面
1 2 3 4 5 6 7 8 9 private Object proxyDataSource (Object originBean) { DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) originBean); if (this .useJdkProxy) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), SpringProxyUtils.getAllInterfaces(originBean), (proxy, method, args) -> handleMethodProxy(dataSourceProxy, method, args, originBean)); } else { return Enhancer.create(originBean.getClass(), (MethodInterceptor) (proxy, method, args, methodProxy) -> handleMethodProxy(dataSourceProxy, method, args, originBean)); } }
这里有一个 DataSourceProxy
代理对象,我们需要看的就是这个类,这个就是我们数据库代理的对象,我们从我们下载的源码项目中,搜索这个代理对象,当我们打开这个类的目录时发现,除了这个,还有 ConnectionProxy
连接对象、 StatementProxy
、 PreparedStatementProxy
SQL 执行对象,这些都被 Seata 进行了代理,为什么要对这些都进行代理,代理的目的其实为了执行 Seata 的业务逻辑,生成 undoLog,全局事务的开启,事务的提交回滚等操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 DataSourceProxy` 具体做了什么,主要功能有哪些,我们来看一下。他在源码中是如何体现的,我们需要关注的是`init() public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { private String resourceGroupId; private void init(DataSource dataSource, String resourceGroupId) { //资源组ID,默认是“default”这个默认值 this.resourceGroupId = resourceGroupId; try (Connection connection = dataSource.getConnection()) { //根据原始数据源得到JDBC连接和数据库类型 jdbcUrl = connection.getMetaData().getURL(); dbType = JdbcUtils.getDbType(jdbcUrl); if (JdbcConstants.ORACLE.equals(dbType)) { userName = connection.getMetaData().getUserName(); } else if (JdbcConstants.MARIADB.equals(dbType)) { dbType = JdbcConstants.MYSQL; } } catch (SQLException e) { throw new IllegalStateException("can not init dataSource", e); } initResourceId(); DefaultResourceManager.get().registerResource(this); if (ENABLE_TABLE_META_CHECKER_ENABLE) { //如果配置开关打开,会定时在线程池不断更新表的元数据缓存信息 tableMetaExecutor.scheduleAtFixedRate(() -> { try (Connection connection = dataSource.getConnection()) { TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType()) .refresh(connection, DataSourceProxy.this.getResourceId()); } catch (Exception ignore) { } }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS); } //Set the default branch type to 'AT' in the RootContext. RootContext.setDefaultBranchType(this.getBranchType()); } }
从上面我们可以看出,他主要做了以下几点的增强:
给每个数据源标识资源组 ID
如果打开配置,会有一个定时线程池定时更新表的元数据信息并缓存到本地
生成代理连接 ConnectionProxy
对象
在这三个增强功能里面,第三个是最重要的,在 AT 模式里面,会自动记录 undoLog,资源锁定,都是通过 ConnectionProxy
完成的,除此之外 DataSrouceProxy
重写了一个方法 getConnection
,因为这里返回的是一个 ConnectionProxy
,而不是原生的 Connection
1 2 3 4 5 6 7 8 9 10 11 @Override public ConnectionProxy getConnection () throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy (this , targetConnection); } @Override public ConnectionProxy getConnection (String username, String password) throws SQLException { Connection targetConnection = targetDataSource.getConnection(username, password); return new ConnectionProxy (this , targetConnection); }
# ConnectionProxy
1 ConnectionProxy` 继承 `AbstractConnectionProxy` ,在这个父类中有很多公用的方法,在这个父类有 `PreparedStatementProxy` 、`StatementProxy` 、`DataSourceProxy
所以我们需要先来看一下 AbstractConnectionProxy
,因为这里封装了需要我们用到的通用方法和逻辑,在其中我们需要关注的主要在于 PreparedStatementProxy
和 StatementProxy
,在这里的逻辑主要是数据源连接的步骤,连接获取,创建执行对象等等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public Statement createStatement () throws SQLException { Statement targetStatement = getTargetConnection().createStatement(); return new StatementProxy (this , targetStatement); } @Override public PreparedStatement prepareStatement (String sql) throws SQLException { String dbType = getDbType(); PreparedStatement targetPreparedStatement = null ; if (BranchType.AT == RootContext.getBranchType()) { List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType); if (sqlRecognizers != null && sqlRecognizers.size() == 1 ) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0 ); if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(), sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId()); String[] pkNameArray = new String [tableMeta.getPrimaryKeyOnlyName().size()]; tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray); targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray); } } } if (targetPreparedStatement == null ) { targetPreparedStatement = getTargetConnection().prepareStatement(sql); } return new PreparedStatementProxy (this , targetPreparedStatement, sql); }
在这两个代理对象中,都用到了以下几个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public ResultSet executeQuery (String sql) throws SQLException { this .targetSQL = sql; return ExecuteTemplate.execute(this , (statement, args) -> statement.executeQuery((String) args[0 ]), sql); } @Override public int executeUpdate (String sql) throws SQLException { this .targetSQL = sql; return ExecuteTemplate.execute(this , (statement, args) -> statement.executeUpdate((String) args[0 ]), sql); } @Override public boolean execute (String sql) throws SQLException { this .targetSQL = sql; return ExecuteTemplate.execute(this , (statement, args) -> statement.execute((String) args[0 ]), sql); }
在这些方法中都调用了 ExecuteTemplate.execute()
,所以我们就看一下在 ExecuteTemplate
类中具体是做了什么操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 public class ExecuteTemplate { public static <T, S extends Statement > T execute (List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { return statementCallback.execute(statementProxy.getTargetStatement(), args); } String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } Executor<T> executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor <>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1 ) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0 ); switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class []{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object []{statementProxy, statementCallback, sqlRecognizer}); break ; case UPDATE: executor = new UpdateExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case DELETE: executor = new DeleteExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case INSERT_ON_DUPLICATE_UPDATE: switch (dbType) { case JdbcConstants.MYSQL: case JdbcConstants.MARIADB: executor = new MySQLInsertOrUpdateExecutor (statementProxy, statementCallback, sqlRecognizer); break ; default : throw new NotSupportYetException (dbType + " not support to INSERT_ON_DUPLICATE_UPDATE" ); } break ; default : executor = new PlainExecutor <>(statementProxy, statementCallback); break ; } } else { executor = new MultiExecutor <>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { ex = new SQLException (ex); } throw (SQLException) ex; } return rs; } }
在 ExecuteTemplate
就一个 execute()
,Seata 将 SQL 执行委托给不同的执行器 (模板),Seata 提供了 6 种执行器也就是我们代码 case 中( INSERT
, UPDATE
, DELETE
, SELECT_FOR_UPDATE
, INSERT_ON_DUPLICATE_UPDATE
),这些执行器的父类都是 AbstractDMLBaseExecutor
UpdateExecutor
: 执行 update 语句
InsertExecutor
: 执行 insert 语句
DeleteExecutor
: 执行 delete 语句
SelectForUpdateExecutor
: 执行 select for update 语句
PlainExecutor
: 执行普通查询语句
MultiExecutor
: 复合执行器,在一条 SQL 语句中执行多条语句
关系图如下:
然后我们找到 rs = executor.execute(args);
最终执行的方法,找到最顶级的父类 BaseTransactionalExecutor.execute()
1 2 3 4 5 6 7 8 9 10 11 @Override public T execute (Object... args) throws Throwable { String xid = RootContext.getXID(); if (xid != null ) { statementProxy.getConnectionProxy().bind(xid); } statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args); }
在根据 doExecute(args);
找到其中的重写方法 AbstractDMLBaseExecutor.doExecute()
1 2 3 4 5 6 7 8 9 10 @Override public T doExecute (Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } }
对于数据库而言,本身都是自动提交的,所以我们进入 executeAutoCommitTrue()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 protected T executeAutoCommitTrue (Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); return new LockRetryPolicy (connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { LOGGER.error("execute executeAutoCommitTrue error:{}" , e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true ); } } connectionProxy.changeAutoCommit()`方法,修改为手动提交后,我们看来最关键的代码`executeAutoCommitFalse() protected T executeAutoCommitFalse (Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException ("multi pk only support mysql!" ); } TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); int updateCount = statementProxy.getUpdateCount(); if (updateCount > 0 ) { TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); } return result; }
我们再回到 executeAutoCommitTrue
中,去看看提交做了哪些操作 connectionProxy.commit();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public void commit () throws SQLException { try { lockRetryPolicy.execute(() -> { doCommit(); return null ; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException (e); } }
进入到 doCommit()
中
1 2 3 4 5 6 7 8 9 10 private void doCommit () throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } }
作为分布式事务,一定是存在全局事务的,所以我们进入 processGlobalTransactionCommit()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private void processGlobalTransactionCommit () throws SQLException { try { register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { UndoLogManagerFactory.getUndoLogManager(this .getDbType()).flushUndoLogs(this ); targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}" , ex.getMessage(), ex); report(false ); throw new SQLException (ex); } if (IS_REPORT_SUCCESS_ENABLE) { report(true ); } context.reset(); }
其中 register()
方法就是注册分支事务的方法,同时还会将 undoLog 写入数据库和执行提交等操作
1 2 3 4 5 6 7 8 9 10 private void register () throws TransactionException { if (!context.hasUndoLog() || !context.hasLockKey()) { return ; } Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null , context.getXid(), context.getApplicationData(), context.buildLockKeys()); context.setBranchId(branchId); }
然后我们在回到 processGlobalTransactionCommit
中,看看写入数据库中的 flushUndoLogs()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public void flushUndoLogs (ConnectionProxy cp) throws SQLException { ConnectionContext connectionContext = cp.getContext(); if (!connectionContext.hasUndoLog()) { return ; } String xid = connectionContext.getXid(); long branchId = connectionContext.getBranchId(); BranchUndoLog branchUndoLog = new BranchUndoLog (); branchUndoLog.setXid(xid); branchUndoLog.setBranchId(branchId); branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems()); UndoLogParser parser = UndoLogParserFactory.getInstance(); byte [] undoLogContent = parser.encode(branchUndoLog); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing UNDO LOG: {}" , new String (undoLogContent, Constants.DEFAULT_CHARSET)); } CompressorType compressorType = CompressorType.NONE; if (needCompress(undoLogContent)) { compressorType = ROLLBACK_INFO_COMPRESS_TYPE; undoLogContent = CompressorFactory.getCompressor(compressorType.getCode()).compress(undoLogContent); } insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName(), compressorType), undoLogContent, cp.getTargetConnection()); }
具体写入方法,此时我们使用的是 MySql,所以执行的是 MySql 实现类 MySQLUndoLogManager.insertUndoLogWithNormal()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override protected void insertUndoLogWithNormal (String xid, long branchId, String rollbackCtx, byte [] undoLogContent, Connection conn) throws SQLException { insertUndoLog(xid, branchId, rollbackCtx, undoLogContent, State.Normal, conn); } private void insertUndoLog (String xid, long branchId, String rollbackCtx, byte [] undoLogContent, State state, Connection conn) throws SQLException { try (PreparedStatement pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL)) { pst.setLong(1 , branchId); pst.setString(2 , xid); pst.setString(3 , rollbackCtx); pst.setBytes(4 , undoLogContent); pst.setInt(5 , state.getValue()); pst.executeUpdate(); } catch (Exception e) { if (!(e instanceof SQLException)) { e = new SQLException (e); } throw (SQLException) e; } }
具体流程如下所示:
# Seata 服务端
我们找到 Server.java
这里就是启动入口,在这个入口中找到协调者,因为 TC 整体的操作就是协调整体的全局事务
1 2 DefaultCoordinator coordinator = DefaultCoordinator.getInstance(nettyRemotingServer);
在 DefaultCoordinator
类中我们找到 一个 doGlobalBegin
这个就是处理全局事务开始的方法,以及全局提交 doGlobalCommit
和全局回滚 doGlobalRollback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override protected void doGlobalBegin (GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout())); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}" , rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); } } @Override protected void doGlobalCommit (GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.commit(request.getXid())); } @Override protected void doGlobalRollback (GlobalRollbackRequest request, GlobalRollbackResponse response, RpcContext rpcContext) throws TransactionException { MDC.put(RootContext.MDC_KEY_XID, request.getXid()); response.setGlobalStatus(core.rollback(request.getXid())); }
在这里我们首先关注 doGlobalBegin
中 core.begin()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public String begin (String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); MDC.put(RootContext.MDC_KEY_XID, session.getXid()); session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); session.begin(); MetricsPublisher.postSessionDoingEvent(session, false ); return session.getXid(); }
然后我们在来看一下 SessionHolder.getRootSessionManager()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static SessionManager getRootSessionManager () { if (ROOT_SESSION_MANAGER == null ) { throw new ShouldNeverHappenException ("SessionManager is NOT init!" ); } return ROOT_SESSION_MANAGER; } public static void init (String mode) { if (StringUtils.isBlank(mode)) { mode = CONFIG.getConfig(ConfigurationKeys.STORE_SESSION_MODE, CONFIG.getConfig(ConfigurationKeys.STORE_MODE, SERVER_DEFAULT_STORE_MODE)); } StoreMode storeMode = StoreMode.get(mode); if (StoreMode.DB.equals(storeMode)) { ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName()); ........ } }
在这里他其实读取的是 DB 模式下 io.seata.server.session.SessionManager
文件的内容
我们在回到 begin
方法中,去查看 session.begin()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void begin () throws TransactionException { this .status = GlobalStatus.Begin; this .beginTime = System.currentTimeMillis(); this .active = true ; for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this ); } }
这里我们来看一下 onBegin()
方法,调用的是父级的方法,在这其中我们要关注 addGlobalSession()
方法,但是要注意,这里我们用的是 db 模式所以调用的是 db 模式的 DateBaseSessionManager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public void onBegin (GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); } @Override public void addGlobalSession (GlobalSession session) throws TransactionException { if (StringUtils.isBlank(taskName)) { boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session); if (!ret) { throw new StoreException ("addGlobalSession failed." ); } } else { boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session); if (!ret) { throw new StoreException ("addGlobalSession failed." ); } } }
然后在看查询其中关键的方法 DataBaseTransactionStoreManager.writeSession()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override public boolean writeSession (LogOperation logOperation, SessionStorable session) { if (LogOperation.GLOBAL_ADD.equals(logOperation)) { return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) { return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) { return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.BRANCH_ADD.equals(logOperation)) { return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) { return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) { return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else { throw new StoreException ("Unknown LogOperation:" + logOperation.name()); } }
我们就看第一次进去的方法 logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override public boolean insertGlobalTransactionDO (GlobalTransactionDO globalTransactionDO) { String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable); Connection conn = null ; PreparedStatement ps = null ; try { int index = 1 ; conn = logStoreDataSource.getConnection(); conn.setAutoCommit(true ); ps = conn.prepareStatement(sql); ps.setString(index++, globalTransactionDO.getXid()); ps.setLong(index++, globalTransactionDO.getTransactionId()); ps.setInt(index++, globalTransactionDO.getStatus()); ps.setString(index++, globalTransactionDO.getApplicationId()); ps.setString(index++, globalTransactionDO.getTransactionServiceGroup()); String transactionName = globalTransactionDO.getTransactionName(); transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0 , transactionNameColumnSize) : transactionName; ps.setString(index++, transactionName); ps.setInt(index++, globalTransactionDO.getTimeout()); ps.setLong(index++, globalTransactionDO.getBeginTime()); ps.setString(index++, globalTransactionDO.getApplicationData()); return ps.executeUpdate() > 0 ; } catch (SQLException e) { throw new StoreException (e); } finally { IOUtil.close(ps, conn); } }
在这里有一个 GlobalTransactionDO
对象,里面有 xid、transactionId
等等,到这里是不是就很熟悉了、
还记得我们第一次使用 Seata 的时候会创建三张表
branch_table 分支事务表
global_table 全局事务表
lock_table 全局锁表
而这里就是对应我们的 global_table
表,其他两个也是差不多,都是一样的操作
流程图如下:
# 总结
完整流程图:
对于 Seata 源码来说主要是了解从哪里入口以及核心点在哪里,遇到有疑问的,可以 Debug,对于 Seata AT 模式,我们主要掌握的核心点是
如何获取全局锁、开启全局事务
解析 SQL 并写入 undolog
# 关于我
Brath 是一个热爱技术的 Java 程序猿,公众号「InterviewCoder」定期分享有趣有料的精品原创文章!
非常感谢各位人才能看到这里,原创不易,文章如果有帮助可以关注、点赞、分享或评论,这都是对我的莫大支持!