seata AT模式实战 (图解

您所在的位置:网站首页 accountnotinstore seata AT模式实战 (图解

seata AT模式实战 (图解

#seata AT模式实战 (图解| 来源: 网络整理| 查看: 265

文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源:

免费赠送 经典图书:《Java高并发核心编程(卷1)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

免费赠送 经典图书:《Java高并发核心编程(卷2)》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

免费赠送 经典图书:《Netty Zookeeper Redis 高并发实战》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

免费赠送 经典图书:《SpringCloud Nginx高并发核心编程》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领

免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取

推荐:入大厂 、做架构、大力提升Java 内功 的 精彩博文 入大厂 、做架构、大力提升Java 内功 必备的精彩博文 2021 秋招涨薪1W + 必备的精彩博文 1:Redis 分布式锁 (图解-秒懂-史上最全) 2:Zookeeper 分布式锁 (图解-秒懂-史上最全) 3: Redis与MySQL双写一致性如何保证? (面试必备) 4: 面试必备:秒杀超卖 解决方案 (史上最全) 5:面试必备之:Reactor模式 6: 10分钟看懂, Java NIO 底层原理 7:TCP/IP(图解+秒懂+史上最全) 8:Feign原理 (图解) 9:DNS图解(秒懂 + 史上最全 + 高薪必备) 10:CDN图解(秒懂 + 史上最全 + 高薪必备) 11: 分布式事务( 图解 + 史上最全 + 吐血推荐 ) 12:seata AT模式实战(图解+秒懂+史上最全) 13:seata 源码解读(图解+秒懂+史上最全) 14:seata TCC模式实战(图解+秒懂+史上最全) Java 面试题 30个专题 , 史上最全 , 面试必刷 阿里、京东、美团... 随意挑、横着走!!! 1: JVM面试题(史上最强、持续更新、吐血推荐) 2:Java基础面试题(史上最全、持续更新、吐血推荐 3:架构设计面试题 (史上最全、持续更新、吐血推荐) 4:设计模式面试题 (史上最全、持续更新、吐血推荐) 17、分布式事务面试题 (史上最全、持续更新、吐血推荐) 一致性协议 (史上最全) 29、多线程面试题(史上最全) 30、HR面经,过五关斩六将后,小心阴沟翻船! 9.网络协议面试题(史上最全、持续更新、吐血推荐) 更多专题, 请参见【 疯狂创客圈 高并发 总目录 】 SpringCloud 精彩博文 nacos 实战(史上最全) sentinel (史上最全+入门教程) SpringCloud gateway (史上最全) 更多专题, 请参见【 疯狂创客圈 高并发 总目录 】 seata AT模式实战(图解+秒懂+史上最全)

阅读此文之前,请先阅读 :

分布式事务( 图解 + 史上最全 + 吐血推荐 )

参考链接 系统架构知识图谱(一张价值10w的系统架构知识图谱)

https://www.processon.com/view/link/60fb9421637689719d246739

秒杀系统的架构

https://www.processon.com/view/link/61148c2b1e08536191d8f92f

先来看下为什么会产生分布式事务问题

分布式事务使用场景

简单来说:

一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题

作为典型案例,搬出经典的银行转账问题:

需求:

假设银行(bank)中有两个客户(name)张三和李四, 我们需要将张三的1000元存款(sal)转到李四的账户上

约束:

不能出现中间状态,张三减1000,李四没加 , 或者 反之

在这里插入图片描述

如果两个用户对应的银行存款数据在一个数据源中,即一个数据库中,那么通过spring框架下的@Transactional注解来保证单一数据源增删改查的一致性。

数据库的水平分割倒逼分布式事务

但是随着业务的不断扩大,用户数在不断变多,几百万几千万用户时数据可以存一个库甚至一个表里,假设有10个亿的用户?

一个表当然放不下,需要分表,当然需要分库来配合。

为了解决数据库上的瓶颈,分库是很常见的解决方案,不同用户就可能落在不同的数据库里,原来一个库里的事务操作,现在变成了跨数据库的事务操作。 在这里插入图片描述

此时@Transactional注解就失效了,这就是跨数据库分布式事务问题

微服务化倒逼分布式事务

当然,更多的情形是随着业务不断增长,将业务中不同模块服务拆分成微服务后,同时调用多个微服务所产生的

设想一个传统的单体应用,无论多少内部调用,最后终归是在同一个数据库上进行操作来完成一向业务操作,如图:

img

随着业务量的发展,业务需求和架构发生了巨大的变化,整体架构由原来的单体应用逐渐拆分成为了微服务,

原来的3个服务被从一个单体架构上拆开了,成为了3个独立的服务,分别使用独立的数据源,也不在之前共享同一个数据源了,具体的业务将由三个服务的调用来完成,如图:

img

此时,每一个服务的内部数据一致性仍然有本地事务来保证。

但是面对整个业务流程上的事务应该如何保证呢?这就是在微服务架构下面临的挑战,如何保证在微服务中的数据一致性。

微服务化的银行转账情景往往是这样的

调用交易系统服务创建交易订单;

调用支付系统记录支付明细;

调用账务系统执行 A 扣钱;

调用账务系统执行 B 加钱;

调用账务系统执行 B 加钱;

在这里插入图片描述

如图所示,每个系统都对应一个独立的数据源,且可能位于不同机房,同时调用多个系统的服务很难保证同时成功,这就是跨服务分布式事务问题

10WQPS秒杀实操的分库架构

在这里插入图片描述

Spring Cloud Alibaba Seata

解决分布式事务问题,有两个设计初衷

对业务无侵入:即减少技术架构上的微服务化所带来的分布式事务问题对业务的侵入 高性能:减少分布式事务解决方案所带来的性能消耗

seata中常用的有两种分布式事务实现方案,AT及TCC

AT模式主要关注多 DB 访问的数据一致性,当然也包括多服务下的多 DB 数据访问一致性问题 TCC 模式主要关注业务拆分,在按照业务横向扩展资源时,解决微服务间调用的一致性问题 AT模式(业务侵入小)

Seata AT模式是基于XA事务演进而来的一个分布式事务中间件,

XA是一个基于数据库实现的分布式事务协议,本质上和两阶段提交一样,需要数据库支持,Mysql5.6以上版本支持XA协议,其他数据库如Oracle,DB2也实现了XA接口

AT模式角色如下

在这里插入图片描述

Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚

Transaction Manager ™:

控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议

Resource Manager (RM):

控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚

AT模式(2PC)基本处理逻辑如下

在这里插入图片描述 Branch就是指的分布式事务中每个独立的本地局部事务

AT模式第一阶段

Seata 的 JDBC 数据源代理通过对业务 SQL 的解析,把业务数据在更新前后的数据镜像组织成回滚日志,利用 本地事务 的 ACID 特性,将业务数据的更新和回滚日志的写入在同一个 本地事务 中提交。

这样,可以保证:任何提交的业务数据的更新一定有相应的回滚日志存在

在这里插入图片描述

基于这样的机制,分支的本地事务便可以在全局事务的第一阶段提交,并马上释放本地事务锁定的资源

这也是Seata和XA事务的不同之处:

经典的2PC两阶段提交(XA)往往对资源的锁定需要持续到第二阶段实际的提交或者回滚操作,

在这里插入图片描述

AT模式,可以在第一阶段释放对资源的锁定,降低了锁范围

谁的功劳:回滚日志

AT模式第二阶段

在这里插入图片描述

场景一:提交,全局提交

如果决议是全局提交,此时分支事务此时已经完成提交,不需要同步协调处理(只需要异步清理回滚日志),Phase2 可以非常快速地完成 在这里插入图片描述

场景2:回滚,全局回滚

如果决议是全局回滚,RM 收到协调器发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚 在这里插入图片描述

AT模式相对于XA模式的优势

在这里插入图片描述

提高效率,即使第二阶段发生异常需要回滚,只需找对undolog中对应数据并反解析成sql来达到回滚目的

同时Seata无入侵,通过代理数据源将业务sql的执行解析成undolog来与业务数据的更新同时入库,达到了对业务无侵入的效果

10WQPS秒杀实操的AT分布式事务架构

在这里插入图片描述

快速开始搭建Seata环境 全局事务和分支事务的存储模式

因为 TC 需要进行全局事务和分支事务的记录,所以需要对应的存储。

目前,TC 有两种存储模式( store.mode ):

file 模式:适合单机模式,全局事务会话信息在内存中读写,并持久化本地文件 root.data,性能较高。 db 模式:适合集群模式,全局事务会话信息通过 db 共享,相对性能差点。 file 模式

全局事务会话信息在内存中读写

并持久化本地文件 root.data

file 模式,最终部署单机 TC Server 如下图所示:

c97604a80f3dad9727473e9350a753be.png

db 模式

集群 Seata TC Server,实现高可用,生产环境下必备。

在集群时,多个 Seata TC Server 通过 db 数据库,实现全局事务会话信息的共享。

每个 Seata TC Server 可以注册自己到注册中心上,方便应用从注册中心获得到他们。 集群 TC Server 如下图所示:

d6fd48e578b11107e6fd321e389dbe06.png

Seata TC Server 对主流的注册中心都提供了集成。国内使用 Nacos 作为注册中心越来越流行,推荐使用nacos。

配置和启动Seata-server服务(TC服务)

seata 官方文档地址:

http://seata.io/zh-cn/docs/overview/what-is-seata.html

下载seata-server-1.3.0和seata-1.3.0源码

seate-server下载: https://seata.io/zh-cn/blog/download.html,

seata-1.3.0源码下载:

https://github.com/seata/seata/releases

https://github.com/seata/seata/releases/download/v1.3.0/seata-server-1.3.0.tar.gz

https://gitee.com/seata-io/seata.git

下载所有的源码后,切换到1.3的分支

seata-server包下载和解压

第一步:https://github.com/seata/seata/releases 下载seata-server包

推荐使用1.3.0版本,最新版本有些配套的依赖包,不一定来得及更新,可能会出现一些奇怪的问题

seata-server-1.3 上传后,大致如下:

[root@cdh1 ~]# cd /work/ [root@cdh1 work]# ll total 333360 -rw-r--r-- 1 root root 33959771 Sep 13 16:00 seata-server-1.3.0.tar.gz drwxr-xr-x 5 root root 87 Dec 26 2020 zookeeper

然后解压缩,tar 命令如下:

[root@cdh1 ~]# rm -rf /work/seata [root@cdh1 ~]# cd /work/ [root@cdh1 work]# tar -zxvf seata-server-1.3.0.tar.gz seata/LICENSE seata/conf/ seata/conf/META-INF/ seata/conf/META-INF/services/ seata/conf/logback.xml seata/conf/file.conf seata/conf/registry.conf seata/conf/META-INF/services/io.seata.server.session.SessionManager seata/conf/META-INF/services/io.seata.core.rpc.RegisterCheckAuthHandler seata/conf/META-INF/services/io.seata.core.store.db.DataSourceProvider seata/conf/META-INF/services/io.seata.server.coordinator.AbstractCore seata/conf/META-INF/servi...... 启动 TC Server(单体的实例)

bin下执行 sh ./seata-server.sh 命令,启动 TC Server 在后台。

我们看到如下日志,说明启动成功:

[root@cdh1 bin]# cd /work/seata/bin/ [root@cdh1 bin]# sh bin/seata-server.sh sh: bin/seata-server.sh: No such file or directory [root@cdh1 bin]# sh ./seata-server.sh Java HotSpot(TM) 64-Bit Server VM warning: Cannot open file /work/seata/logs/seata_gc.log due to No such file or directory ..... 2021-09-15 04:47:36.143 WARN [main]i.s.c.l.EnhancedServiceLoader$InnerEnhancedServiceLoader.loadFile:482 -The same extension io.seata.server.storage.file.lock.FileLockManager has already been loaded, skipped 2021-09-15 04:47:36.248 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ... 默认配置下,Seata TC Server 启动在 8091 端点。 使用 File 存储器全局会话

因为我们没有修改任何配置文件,默认情况seata使用的是file模式进行数据持久化,所以可以看到用于持久化的本地文件 root.data;操作命令如下:

[root@cdh1 bin]# ll -ls /work/seata/bin/sessionStore/ total 0 0 -rw-r--r-- 1 root root 0 Sep 15 11:47 root.data seata-server的TC端配置

file.conf 配置文件,是RM(各大微服务的)和TC之间的通信配置

TC端的参数清单,大致如下: key desc remark transaction.undo.log.save.days undo保留天数 默认7天,log_status=1(附录3)和未正常清理的undo transaction.undo.log.delete.period undo清理线程间隔时间 默认86400000,单位毫秒 service.max.commit.retry.timeout 二阶段提交重试超时时长 单位ms,s,m,h,d,对应毫秒,秒,分,小时,天,默认毫秒。默认值-1表示无限重试。公式: timeout>=now-globalTransactionBeginTime,true表示超时则不再重试 service.max.rollback.retry.timeout 二阶段回滚重试超时时长 同commit recovery.committing-retry-period 二阶段提交未完成状态全局事务重试提交线程间隔时间 默认1000,单位毫秒 recovery.asyn-committing-retry-period 二阶段异步提交状态重试提交线程间隔时间 默认1000,单位毫秒 recovery.rollbacking-retry-period 二阶段回滚状态重试回滚线程间隔时间 默认1000,单位毫秒 recovery.timeout-retry-period 超时状态检测重试线程间隔时间 默认1000,单位毫秒,检测出超时将全局事务置入回滚会话管理器 store.mode 事务会话信息存储方式 file本地文件(不支持HA),db数据库(支持HA) store.file.dir file模式文件存储文件夹名 默认sessionStore store.db.datasource db模式数据源类型 默认dbcp store.db.db-type db模式数据库类型 默认mysql store.db.driver-class-name db模式数据库驱动 默认com.mysql.jdbc.Driver store.db.url db模式数据源库url 默认jdbc:mysql://127.0.0.1:3306/seata store.db.user db模式数据库账户 默认mysql store.db.min-conn db模式数据库初始连接数 默认1 store.db.max-conn db模式数据库最大连接数 默认3 store.db.global.table db模式全局事务表名 默认global_table store.db.branch.table db模式分支事务表名 默认branch_table store.db.lock-table db模式全局锁表名 默认lock_table store.db.query-limit db模式查询全局事务一次的最大条数 默认1000 metrics.enabled 是否启用Metrics 默认false关闭,在False状态下,所有与Metrics相关的组件将不会被初始化,使得性能损耗最低 metrics.registry-type 指标注册器类型 Metrics使用的指标注册器类型,默认为内置的compact(简易)实现,这个实现中的Meter仅使用有限内存计数,性能高足够满足大多数场景;目前只能设置一个指标注册器实现 metrics.exporter-list 指标结果Measurement数据输出器列表 默认prometheus,多个输出器使用英文逗号分割,例如"prometheus,jmx",目前仅实现了对接prometheus的输出器 metrics.exporter-prometheus-port prometheus输出器Client端口号 默认9898 部署集群 TC Server

本小节,我们来学习部署集群 Seata TC Server,实现高可用,生产环境下必备。

在集群时,多个 Seata TC Server 通过 db 数据库,实现全局事务会话信息的共享。

每个 Seata TC Server 可以注册自己到注册中心上,方便应用从注册中心获得到他们。最终我们部署 集群 TC Server 如下图所示:

在这里插入图片描述

使用 Nacos 作为注册中心

修改 conf/registry.conf 配置文件,设置使用 Nacos 注册中心。如下图所示:

registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa # type = "file" type = "nacos" nacos { application = "seata-server" serverAddr = "localhost" namespace = "" cluster = "default" username = "nacos" password = "nacos" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = 0 password = "" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } }

如果设置的 -t 参数,表示设置了命名空间,则需要提前创建命名空间。然后拿到其 命名空间ID---- uuid。

在这里插入图片描述

TC端数据库初始化

下载TC端的配置文件,可以从github:

https://github.com/seata-io/seata/tree/v1.3.0/script/server/db

或者 gitee:

https://gitee.com/seata-io/seata/tree/v1.3.0/script/server/db

下载Seata1.3的版本对应的脚本。目前支持mysql、oracle、postgresql这三种数据库,

上述三种脚本是针对Seata的Sever端在协调处理分布式事务时所需要的3张表,提供了不同数据库的global_table表、branch_table表、lock_table表创建脚本,根据自身数据库执行对应的sql脚本执行即可。

这里以mysql为例,在你的mysql数据库中创建名为seata的库,并执行以下sql,将会生成三张表:global_table表、branch_table表、lock_table表

在这里插入图片描述

创建TC端的专属独立库seata。

连接MYSQL:

格式: mysql -h主机地址 -u用户名 -p用户密码

mysql -h192.168.9.1 -uroot -p123456 mysql -uroot -p123456 /usr/bin/mysql -uroot -p"123456" --connect-expired-password use seata; Database changed mysql> source /work/seata/conf/db_store.sql; Query OK, 0 rows affected, 1 warning (0.00 sec) Query OK, 0 rows affected (0.02 sec) Query OK, 0 rows affected, 1 warning (0.00 sec) Query OK, 0 rows affected (0.00 sec) Query OK, 0 rows affected, 1 warning (0.00 sec) Query OK, 0 rows affected (0.01 sec)

这里我们只是做演示,理论上上面三个业务服务应该分属不同的数据库,这里我们只是在同一台数据库下面创建三个 Schema ,分别为 db_account 、 db_order 和 db_storage ,具体如图:

mysql> show tables; +-----------------+ | Tables_in_seata | +-----------------+ | branch_table | | global_table | | lock_table | +-----------------+ 3 rows in set (0.00 sec)

通过windows 工具操作,也是类似的,导入 db_store.sql之后,创建了3个表 db_account 、 db_order 和 db_storage ,具体如图:

10bc6bbb40535db878403d34014d156b.png

db 数据库共享全局事务会话信息: 如果使用file作为配置文件

修改 conf/file.conf 配置文件,修改使用 db 数据库,实现 Seata TC Server 的全局事务会话信息的共享。

如下图所示:

## transaction log store, only used in seata-server store { ## store mode: file、db ## mode = "file" mode = "db" ## file store property file { ## store location dir dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions maxBranchSessionSize = 16384 # globe session size , if exceeded throws exceptions maxGlobalSessionSize = 512 # file buffer size , if exceeded allocate new buffer fileWriteBufferCacheSize = 16384 # when recover batch read size sessionReloadReadSize = 100 # async, sync flushDiskMode = async } ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" ## driverClassName = "com.mysql.jdbc.Driver" driverClassName = "com.mysql.jdbc.Driver" url = "jdbc:mysql://192.168.56.121:3306/seata?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&useSSL=true&serverTimezone=UTC" user = "root" password = "123456" minConn = 5 maxConn = 30 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 maxWait = 5000 } } 如何使用nacos作为配置中心:

修改使用 db 数据库,实现 Seata TC Server 的全局事务会话信息的共享。

transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableClientBatchSendRequest=false transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 service.vgroupMapping.my_test_tx_group=default service.vgroupMapping.seata-seckill-demo-seata-service-group=default service.vgroupMapping.seata-order-demo-seata-service-group=default service.vgroupMapping.seata-stock-demo-seata-service-group=default service.default.grouplist=cdh1:8091 service.enableDegrade=false service.disableGlobalTransaction=false client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=false client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 #store.mode=file store.mode=db store.file.dir=file_store/data store.file.maxBranchSessionSize=16384 store.file.maxGlobalSessionSize=512 store.file.fileWriteBufferCacheSize=16384 store.file.flushDiskMode=async store.file.sessionReloadReadSize=100 store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://cdh1:3306/seata?useUnicode=true store.db.user=root store.db.password=123456 store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false client.undo.dataValidation=true client.undo.logSerialization=jackson server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 client.undo.logTable=undo_log client.log.exceptionRate=100 transport.serialization=seata transport.compressor=none metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898 补充:MySQL8 的支持

如果使用的 MySQL 是 8.X 版本,需要下载 MySQL 8.X JDBC 驱动,命令行操作如下:

$ cd lib $ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

然后,修改 conf/file 配置文件,使用该 MySQL 8.X JDBC 驱动。如下图所示:

在这里插入图片描述

启动 第一个TC Server

执行 nohup sh bin/seata-server.sh -p 18091 -n 1 & 命令,启动第一个 TC Server 在后台。

-p:Seata TC Server 监听的端口。

-n:Server node。在多个 TC Server 时,需区分各自节点,用于生成不同区间的 transactionId 事务编号,以免冲突。

nohup sh /work/seata/bin/seata-server.sh -p 18091 -n 1 > /work/seata/bin/consol.log & tail -f /work/seata/bin/consol.log

在 consol.log 文件中,我们看到如下日志,说明启动成功:

e.session.FileSessionManager has already been loaded, skipped 2021-09-15 07:46:33.472 WARN [main]i.s.c.l.EnhancedServiceLoader$InnerEnhancedServiceLoader.loadFile:482 -The same extension io.seata.server.storage.db.session.DataBaseSessionManager has already been loaded, skipped ..... 2021-09-15 07:46:34.094 WARN [main]i.s.c.l.EnhancedServiceLoader$InnerEnhancedServiceLoader.loadFile:482 -The same extension io.seata.server.storage.file.lock.FileLockManager has already been loaded, skipped 2021-09-15 07:46:34.220 INFO [main]io.seata.core.rpc.netty.RpcServerBootstrap.start:155 -Server started ... 启动 第2个TC Server

执行 nohup sh bin/seata-server.sh -p 18092 -n 2 & 命令,启动第二个 TC Server 在后台。

nohup sh /work/seata/bin/seata-server.sh -p 18092 -n 1 > /work/seata/bin/consol2.log & tail -f /work/seata/bin/consol2.log 实验演示:注册到nacos之后,可以在nacos上看效果

具体演示,参见此博客的配套视频

控制台提交nacos配置脚本

下载nacos的配置文件,原始config.txt文件可以从github:

https://github.com/seata/seata/tree/develop/script/config-center

或者 gitee:

https://gitee.com/seata-io/seata/tree/develop/script/config-center

下载的并修改:

transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableClientBatchSendRequest=true transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 service.vgroup_mapping.my_test_tx_group=default service.vgroup_mapping.seata-seckill-demo=default service.vgroup_mapping.seata-order-demo=default service.vgroup_mapping.seata-stock-demo=default service.default.grouplist=127.0.0.1:8091 service.enableDegrade=false service.disableGlobalTransaction=false client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=false client.rm.tableMetaCheckerInterval=60000 client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false client.rm.tccActionInterceptorOrder=-2147482648 client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 client.tm.defaultGlobalTransactionTimeout=60000 client.tm.degradeCheck=false client.tm.degradeCheckAllowTimes=10 client.tm.degradeCheckPeriod=2000 client.tm.interceptorOrder=-2147482648 store.mode=file store.lock.mode=file store.session.mode=file store.publicKey= store.file.dir=file_store/data store.file.maxBranchSessionSize=16384 store.file.maxGlobalSessionSize=512 store.file.fileWriteBufferCacheSize=16384 store.file.flushDiskMode=async store.file.sessionReloadReadSize=100 store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://cdh1:3306/seata?useUnicode=true&rewriteBatchedStatements=true store.db.user=root store.db.password=123456 store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 store.redis.mode=single store.redis.single.host=cdh1 store.redis.single.port=6379 store.redis.sentinel.masterName= store.redis.sentinel.sentinelHosts= store.redis.maxConn=10 store.redis.minConn=1 store.redis.maxTotal=100 store.redis.database=0 store.redis.password=123456 store.redis.queryLimit=100 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false server.distributedLockExpireTime=10000 client.undo.dataValidation=true client.undo.logSerialization=jackson client.undo.onlyCareUpdateColumns=true server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 client.undo.logTable=undo_log client.undo.compress.enable=true client.undo.compress.type=zip client.undo.compress.threshold=64k log.exceptionRate=100 transport.serialization=seata transport.compressor=none metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898 修改 自己定义的服务组

在 nacos-config.txt 文件 修改 自己定义的服务组 ,参考如下:

service.vgroup_mapping.my_test_tx_group=default service.vgroup_mapping.seata-seckill-demo=default service.vgroup_mapping.seata-order-demo=default service.vgroup_mapping.seata-stock-demo=default

中间的${your-service-gruop}为自己定义的服务组名称,这里需要我们在程序的配置文件中配置,笔者这里直接使用程序的 spring.application.name。

修改数据库连接

用到2个文件,nacos-config.txt和nacos-config.sh,都在config目录下。

config.txt需要:

修改数据库连接 store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.jdbc.Driver # 这里 store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true # 这里 store.db.user=username # 这里 store.db.password=password # 这里 将文件中的配置导入nacos 即可

下载nacos的配置文件,原始config.txt文件可以从github:

https://github.com/seata/seata/tree/develop/script/config-center

或者 gitee:

https://gitee.com/seata-io/seata/tree/develop/script/config-center

提供的nacos脚本nacos-config.sh,将上面的config.txt文件复制到seata的config目录,通过 nacos-config.sh ,将文件中的配置导入nacos 即可。执行以下命令:

sh /work/seata/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u nacos -w nacos

将以上信息提交到nacos控制台,当然,如果有需要修改参数,可直接通过登录nacos控制台修改。

[root@cdh1 script]# sh /work/seata/script/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u nacos -w nacos set nacosAddr=localhost:8848 set group=SEATA_GROUP Set transport.type=TCP successfully Set transport.server=NIO successfully Set transport.heartbeat=true successfully Set transport.enableClientBatchSendRequest=true successfully Set transport.threadFactory.bossThreadPrefix=NettyBoss successfully Set transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker successfully Set transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler successfully Set transport.threadFactory.shareBossWorker=false successfully Set transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector successfully Set transport.threadFactory.clientSelectorThreadSize=1 successfully Set transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread successfully Set transport.threadFactory.bossThreadSize=1 successfully Set transport.threadFactory.workerThreadSize=default successfully Set transport.shutdown.wait=3 successfully Set service.vgroup_mapping.my_test_tx_group=default successfully Set service.vgroup_mapping.seata-seckill-demo=default successfully Set service.vgroup_mapping.seata-order-demo=default successfully Set service.vgroup_mapping.seata-stock-demo=default successfully Set service.default.grouplist=127.0.0.1:8091 successfully Set service.enableDegrade=false successfully Set service.disableGlobalTransaction=false successfully Set client.rm.asyncCommitBufferLimit=10000 successfully Set client.rm.lock.retryInterval=10 successfully Set client.rm.lock.retryTimes=30 successfully Set client.rm.lock.retryPolicyBranchRollbackOnConflict=true successfully Set client.rm.reportRetryCount=5 successfully Set client.rm.tableMetaCheckEnable=false successfully Set client.rm.tableMetaCheckerInterval=60000 successfully Set client.rm.sqlParserType=druid successfully Set client.rm.reportSuccessEnable=false successfully Set client.rm.sagaBranchRegisterEnable=false successfully Set client.rm.tccActionInterceptorOrder=-2147482648 successfully Set client.tm.commitRetryCount=5 successfully Set client.tm.rollbackRetryCount=5 successfully Set client.tm.defaultGlobalTransactionTimeout=60000 successfully Set client.tm.degradeCheck=false successfully Set client.tm.degradeCheckAllowTimes=10 successfully Set client.tm.degradeCheckPeriod=2000 successfully Set client.tm.interceptorOrder=-2147482648 successfully Set store.mode=file successfully Set store.lock.mode=file successfully Set store.session.mode=file successfully Set store.publicKey= failure Set store.file.dir=file_store/data successfully Set store.file.maxBranchSessionSize=16384 successfully Set store.file.maxGlobalSessionSize=512 successfully Set store.file.fileWriteBufferCacheSize=16384 successfully Set store.file.flushDiskMode=async successfully Set store.file.sessionReloadReadSize=100 successfully Set store.db.datasource=druid successfully Set store.db.dbType=mysql successfully Set store.db.driverClassName=com.mysql.jdbc.Driver successfully Set store.db.url=jdbc:mysql://cdh1:3306/seata?useUnicode=true&rewriteBatchedStatements=true successfully Set store.db.user=root successfully Set store.db.password=123456 successfully Set store.db.minConn=5 successfully Set store.db.maxConn=30 successfully Set store.db.globalTable=global_table successfully Set store.db.branchTable=branch_table successfully Set store.db.queryLimit=100 successfully Set store.db.lockTable=lock_table successfully Set store.db.maxWait=5000 successfully Set store.redis.mode=single successfully Set store.redis.single.host=cdh1 successfully Set store.redis.single.port=6379 successfully Set store.redis.sentinel.masterName= failure Set store.redis.sentinel.sentinelHosts= failure Set store.redis.maxConn=10 successfully Set store.redis.minConn=1 successfully Set store.redis.maxTotal=100 successfully Set store.redis.database=0 successfully Set store.redis.password=123456 successfully Set store.redis.queryLimit=100 successfully Set server.recovery.committingRetryPeriod=1000 successfully Set server.recovery.asynCommittingRetryPeriod=1000 successfully Set server.recovery.rollbackingRetryPeriod=1000 successfully Set server.recovery.timeoutRetryPeriod=1000 successfully Set server.maxCommitRetryTimeout=-1 successfully Set server.maxRollbackRetryTimeout=-1 successfully Set server.rollbackRetryTimeoutUnlockEnable=false successfully Set server.distributedLockExpireTime=10000 successfully Set client.undo.dataValidation=true successfully Set client.undo.logSerialization=jackson successfully Set client.undo.onlyCareUpdateColumns=true successfully Set server.undo.logSaveDays=7 successfully Set server.undo.logDeletePeriod=86400000 successfully Set client.undo.logTable=undo_log successfully Set client.undo.compress.enable=true successfully Set client.undo.compress.type=zip successfully Set client.undo.compress.threshold=64k successfully Set log.exceptionRate=100 successfully Set transport.serialization=seata successfully Set transport.compressor=none successfully Set metrics.enabled=false successfully Set metrics.registryType=compact successfully Set metrics.exporterList=prometheus successfully Set metrics.exporterPrometheusPort=9898 successfully ========================================================================= Complete initialization parameters, total-count:97 , failure-count:3 ========================================================================= init nacos config fail.

顺利完成后,nacos中如下

在这里插入图片描述

如果设置的 -t 参数,表示设置了命名空间,则需要提前创建命名空间。然后拿到其 命名空间ID---- uuid。

在这里插入图片描述

直接使用nacos 的data-id配置文件 制作和上传配置文件

把上面的配置,做成一个文件,放在nacos中

在这里插入图片描述

config文件中的修改 ##配置seata-server的注册中心,支持file 、nacos 、eureka、redis、zk、consul、etcd3、sofa registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa # type = "file" type = "nacos" nacos { application = "seata-server" serverAddr = "cdh1:8848" namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4" cluster = "default" username = "nacos" password = "nacos" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6379" db = 0 password = "" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } } ##配置seata-server的配置中心,支持file、nacos 、apollo、zk、consul、etcd3 config { # file、nacos 、apollo、zk、consul、etcd3 # type = "file" type = "nacos" #nacos { # serverAddr = "ch" # namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4" # group = "SEATA_GROUP" # username = "" # password = "" # } nacos { application = "seata-server" # serverAddr = "192.168.56.121:8848" serverAddr = "cdh1:8848" namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4" group = "SEATA_GROUP" dataId = "seata-tc.properties" cluster = "default" username = "nacos" password = "nacos" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } } 重启 TC,一切OK NameSpace、Group、DataId

nacos中提供了NameSpace、Group、DataId,他的作用是能让我们对配置文件进行分类管理,三个能够确定唯一的配置文件。我说一点,可能不同的公司,会对这3个的定义是不同的。

 比如 定义一:

    1. NameSpace:区分不同的环境

    2. Group:区分不同的项目或系统

    3. DataId:项目中的配置文件

 定义二:

    1. NameSpace:区分不同的项目

    2. Group:区分不同的模块

    3. DataId:区分不同的环境

 

 定义三:

    1. NameSpace:区分不同的租户

    2. Group:区分不同的应用

    3. DataId:区分不同的环境

 还有其他的定义,看公司。

NameSpace:区分不同的环境

一般开发都会有多套环境,如果多套环境公用一个nacos,那么配置中心和注册中心都会发生冲突,所以需要用namespace隔离开

address: 0.0.0.0 port: 8083 servlet: # 这里设置了context-path context-path: /settlement/v1 spring: application: name: settlement cloud: nacos: config: server-addr: nacos-headless.default.svc.cluster.local:8848 # 控制台创建命名空间得到的uuid namespace: c9ad103a-5420-4628-aba7-c147e3048d9d discovery: server-addr: nacos-headless.default.svc.cluster.local:8848 # 控制台创建命名空间得到的uuid namespace: c9ad103a-5420-4628-aba7-c147e3048d9d metadata: management: # 这里要适配下健康检查的endpoint context-path: '${server.servlet.context-path}/actuator' management: endpoints: web: exposure: # actuator暴露所有endpoint include: "*" 进行Group的切换

 

假设要进行Group的切换,只需要改下面的配置的值:

server: port: 3377 spring: application: name: nacos-config-client cloud: nacos: discovery: server-addr: localhost:8848 #注册中心的地址 config: server-addr: localhost:8848 #配置中心的地址 file-extension: yaml # 要读取nacos上的配置文件的后缀,这里只能是yaml,不能是yml group: TEST_GROUP

  

进行NameSpace的切换

假设要进行NameSpace的切换,只需要改下面的配置的值:

server: port: 3377 spring: application: name: nacos-config-client cloud: nacos: discovery: server-addr: localhost:8848 #注册中心的地址 namespace: 命名空间的ID config: server-addr: localhost:8848 #配置中心的地址 file-extension: yaml # 要读取nacos上的配置文件的后缀,这里只能是yaml,不能是yml namespace: 命名空间的ID

 假设同一个NameSpace、Group,要进行DataId的 profile 后缀切换,只需要改下面的配置的值:

spring: profiles: active: dev

  

AT模式的RM&TM&TC直接的交互流程

在这里插入图片描述

- 事务协调器 TC : 维护全局和分支事务的状态,指示全局提交或者回滚。 - 事务管理者 TM : 开启、提交或者回滚一个全局事务。 - 资源管理者 RM(数据库) : 管理执行分支事务的那些资源,向TC注册分支事务、上报分支事务状态、控制分支事务的提交或者回滚。

大致的流程

① TM 请求 TC, 开始一个新的全局事务,TC 会为这个全局事务生成一个 唯一XID ② XID 通过微服务的调用链传递到其他微服务。 ③ RM 向TC 注册分支事务, 将其纳入XID 对应全局事务的管辖 ; ④ TM 请求 TC 对这个 XID 进行提交或回滚 ⑤ TC 指挥这个 XID 下面的所有分支事务进行提交、回滚。

上面的流程有点复杂,如果搞不清楚,可以看下 配套视频

TM&RM 应用开发

如果你经过前面的步骤搭建Seata环境完成了,那么你可以尝试一下分布式事务的应用开发,也就是TM&RM这块。

在这里插入图片描述

业务场景

那么下面准备以Seata官方文档上的一个经典例子为题,模拟用户下单,创建订单同时扣减库存数量这一过程中产生的分布式事务问题,然后使用Seata解决,正好使用以下Seata的特性。

在这里插入图片描述

中间件版本选型

在当下微服务架构比较火热时,新一代微服务解决方案Spring Cloud Alibaba提供的开源分布式事务解决框架Seata无疑成为了我们在解决分布式事务时的首要之选,

版本选择: Spring Cloud Alibaba与Spring Boot、Spring Cloud版本对应关系

img

坑点:

如果项目中使用了druid数据库连接池,引入的是SpringBoot的Starter依赖druid-spring-boot-starter,那么需要把druid-spring-boot-starter依赖换成druid1.1.23,

因为seata源码中引入的druid依赖跟druid-spring-boot-starter的自动装配类冲突了,冲突的情况下项目启动出现异常

数据库准备

在这里插入图片描述

连接MYSQL:

格式: mysql -h主机地址 -u用户名 -p用户密码

mysql -h192.168.9.1 -uroot -p123456 mysql -uroot -p123456 创建RM端的专属独立库seata。 /usr/bin/mysql -uroot -p"123456" --connect-expired-password use seata-stock-demo; Database changed mysql>source /work/seata/conf/un_do.sql; mysql> use seata-order-demo; Database changed mysql>source /work/seata/conf/un_do.sql;

这里我们只是做演示,理论上上面三个业务服务应该分属不同的数据库,这里我们只是在同一台数据库下面创建三个 Schema ,分别为 db_account 、 db_order 和 db_storage ,具体如图:

mysql> show tables; +-----------------+ | Tables_in_seata | +-----------------+ | branch_table | | global_table | | lock_table | +-----------------+ 3 rows in set (0.00 sec) TM端的注册中心配置

下载TM端的配置文件,原始config.txt文件可以从github:

https://github.com/seata/seata/tree/develop/script/client

或者 gitee:

https://gitee.com/seata-io/seata/tree/develop/script/client

下载的并修改 register.config:

registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom # type = "file" type = "nacos" nacos { application = "seata-server" serverAddr = "cdh1" namespace = "e385bfe2-e743-4910-8c32-e05759f9f9f4" cluster = "SEATA_GROUP" username = "nacos" password = "nacos" } # nacos { # application = "seata-server" # serverAddr = "127.0.0.1:8848" # group = "SEATA_GROUP" # namespace = "" # username = "" # password = "" # } eureka { serviceUrl = "http://localhost:8761/eureka" weight = "1" } redis { serverAddr = "localhost:6379" db = "0" password = "" timeout = "0" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { serverAddr = "127.0.0.1:8500" aclToken = "" } etcd3 { serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } custom { name = "" } } config { # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom type = "file" nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "SEATA_GROUP" username = "" password = "" dataId = "seata.properties" } consul { serverAddr = "127.0.0.1:8500" aclToken = "" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" apolloAccesskeySecret = "" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" nodePath = "/seata/seata.properties" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } custom { name = "" } } file.conf本地配置

如果配置中心的类型都不是file时,就不需要file.conf文件了

如果配置中心的类型是file时,就需要file.conf

transport { # tcp udt unix-domain-socket type = "TCP" #NIO NATIVE server = "NIO" #enable heartbeat heartbeat = true # the client batch send request enable enableClientBatchSendRequest = true #thread factory for netty threadFactory { bossThreadPrefix = "NettyBoss" workerThreadPrefix = "NettyServerNIOWorker" serverExecutorThread-prefix = "NettyServerBizHandler" shareBossWorker = false clientSelectorThreadPrefix = "NettyClientSelector" clientSelectorThreadSize = 1 clientWorkerThreadPrefix = "NettyClientWorkerThread" # netty boss thread size,will not be used for UDT bossThreadSize = 1 #auto default pin or 8 workerThreadSize = "default" } shutdown { # when destroy server, wait seconds wait = 3 } serialization = "seata" compressor = "none" } service { #transaction service group mapping # my_test_tx_group为分组名称,需要与应用中的配置分组名称一致 # "default"为seata-server的集群名称,即seata-server注册到注册中心的名称 vgroupMapping.my_test_tx_group = "default" #only support when registry.type=file, please don't set multiple addresses # 当registry.type=file时,客户端通过ip:port连接到seata-server default.grouplist = "127.0.0.1:8091" #degrade, current not support enableDegrade = false #disable seata disableGlobalTransaction = false } client { rm { asyncCommitBufferLimit = 10000 lock { retryInterval = 10 retryTimes = 30 retryPolicyBranchRollbackOnConflict = true } reportRetryCount = 5 tableMetaCheckEnable = false reportSuccessEnable = false } tm { commitRetryCount = 5 rollbackRetryCount = 5 } undo { dataValidation = true logSerialization = "jackson" # undo表名称,SEATA AT模式需要UNDO_LOG表 logTable = "undo_log" } log { exceptionRate = 100 } } file.conf 配置文件分部分说明

这里说一下配置的内容,已默认的file为例: 在file.conf中有3部分配置内容:

1.transport

transport部分的配置是关于Netty的配置,主要体现在io.seata.core.rpc.netty包下的NettyBaseConfig、NettyServerConfig、NettyClientConfig,client与server的通信使用的是Netty

2.service

仅针对client有效

# service configuration, only used in client side service { #transaction service group mapping #my_test_tx_group--->自定义分布式事务组名称 vgroupMapping.my_test_tx_group = "default" #only support when registry.type=file, please don't set multiple addresses #db模式改配置无效 default.grouplist = "127.0.0.1:8091" #degrade, current not support enableDegrade = false #disable seata #是否启用seata的分布式事务 disableGlobalTransaction = false }

io.seata.spring.annotation.GlobalTransactionScanner

3.client

仅针对client有效

#client transaction configuration, only used in client side client { rm { #RM接收TC的commit通知后缓冲上限 asyncCommitBufferLimit = 10000 lock { #校验或占用全局锁重试间隔 默认10,单位毫秒 retryInterval = 10 #校验或占用全局锁重试次数 默认30 retryTimes = 30 #分支事务与其它全局回滚事务冲突时锁策略 默认true,优先释放本地锁让回滚成功 retryPolicyBranchRollbackOnConflict = true } #一阶段结果上报TC重试次数 默认5次 reportRetryCount = 5 #自动刷新缓存中的表结构 默认false tableMetaCheckEnable = false #是否上报一阶段成功 true、false,从1.1.0版本开始,默认false.true用于保持分支事务生命周期记录完整,false可提高不少性能 reportSuccessEnable = false sqlParserType = druid } tm { #一阶段全局提交结果上报TC重试次数 默认1次,建议大于1 commitRetryCount = 5 #一阶段全局回滚结果上报TC重试次数 默认1次,建议大于1 rollbackRetryCount = 5 } undo { #二阶段回滚镜像校验 默认true开启,false关闭 dataValidation = true #undo序列化方式 默认jackson logSerialization = "jackson" #自定义undo表名 默认undo_log logTable = "undo_log" } log { #日志异常输出概率 默认100,目前用于undo回滚失败时异常堆栈输出,百分之一的概率输出,回滚失败基本是脏数据,无需输出堆栈占用硬盘空间 exceptionRate = 100 } } 项目准备 模块架构

在这里插入图片描述

模块的角色架构

在这里插入图片描述

maven依赖 com.alibaba.cloud spring-cloud-starter-alibaba-seata 2.1.3.RELEASE io.seata seata-spring-boot-starter io.seata seata-spring-boot-starter 1.3.0 事务分组是什么?

于是,上官网查看相关的参数配置,搜索serviceGroup

https://seata.io/zh-cn/docs/user/transaction-group.html

事务分组是什么? 事务分组是 Seata 的资源逻辑,类似于服务实例。在file.conf中的 my_test_tx_group就是一个事务分组。

通过事务分组如何找到后端集群? 首先程序中配置了事务分组(GlobalTransactionScanner 构造方法的txServiceGroup参数),程序会通过用户配置的配置中心去寻找 service.vgroupMapping.事务分组配置项,取得配置项的值就是TC集群的名称。拿到集群名称程序通过一定的前后缀+集群名称去构造服务名, 各配置中心的服务名实现不同。拿到服务名去相应的注册中心去拉取相应服务名的服务列表,获得后端真实的TC服务列表。

为什么这么设计,不直接取服务名? 这里多了一层获取事务分组到映射集群的配置。这样设计后,事务分组可以作为资源的逻辑隔离单位,当发生故障时可以快速failover。

根据第 2 点的说明,就可以知道问题所在了:

TM&RM配置详解 registry.conf:配置注册中心和配置中心,默认是file。 file.conf:seata工作规则信息 DataSourceConfig:配置代理数据源实现分支事务,如果没有注入,事务无法成功回滚 registry.conf配置:配置注册中心和配置中心

该文件包含两部分配置:

注册中心

配置中心

注册中心 registry { # 注册中心配置 # 可选项:file 、nacos 、eureka、redis、zk type = "nacos" # 指定nacos注册中心,默认是file。由于项目整体使用nacos,所以后续选择nacos nacos { serverAddr = "127.0.0.1:8848" namespace = "public" cluster = "default" } eureka { serviceUrl = "http://localhost:1001/eureka" application = "default" weight = "1" } redis { serverAddr = "localhost:6381" db = "0" } zk { cluster = "default" serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } file { name = "file.conf" } } 配置中心 config { # 配置中心 # 可选项:file、nacos 、apollo、zk type = "file" # 指向file配置中心,也可以指向nacos等其他注册中心 nacos { serverAddr = "localhost" namespace = "public" cluster = "default" } apollo { app.id = "fescar-server" apollo.meta = "http://192.168.1.204:8801" } zk { serverAddr = "127.0.0.1:2181" session.timeout = 6000 connect.timeout = 2000 } file { name = "file.conf" # 通过file.conf配置seata参数,指向第二个配置文件 } } file.conf

该文件的命名取决于registry.conf配置中心的配置

由于registry.conf中配置的是

img

也就是说:file.conf

文件名取决于registry的配置中心(config{...})配置,如果registry配置的配置中心不是file,可以没有改文件。

如果配置中心是nacos,这是file.conf文件就不需要了,把file.conf文件内容交给nacos就可

事务日志存储配置: store { ## store mode: file、db mode = "file" # 存储方式file、db ## file store file { dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions max-branch-session-size = 16384 # globe session size , if exceeded throws exceptions max-global-session-size = 512 # file buffer size , if exceeded allocate new buffer file-write-buffer-cache-size = 16384 # when recover batch read size session.reload.read_size = 100 # async, sync flush-disk-mode = async } ## database store db { driver_class = "" url = "" user = "" password = "" } } TC信息配置

当前微服务在seata服务器中注册的信息配置:

service { #vgroup->rgroup #必须和服务名一致:${spring.applicaiton.name} #vgroup_mapping.${spring.application.name}-fescar-service-group = "default" vgroup_mapping.${spring.application.name}-fescar-service-group = "default" #only support single node default.grouplist = "cdh1:18091" #seata-server服务器地址,默认是8091 #degrade current not support enableDegrade = false #disable disable = false } 客户端相关工作的机制 client { async.commit.buffer.limit = 10000 lock { retry.internal = 10 retry.times = 30 } } DataSourceConfig

每一个微服务原来自己的数据源都必须使用DataSourceProxy代理,这样seata才能掌控所有事务。

@Configuration public class DataSourceConfig { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } /** * 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚 * * @param druidDataSource The DruidDataSource * @return The default datasource */ @Primary @Bean("dataSource") public DataSource dataSource(DruidDataSource druidDataSource) { return new DataSourceProxy(druidDataSource); } } 事务注解

RM主业务方法添加全局事务:@GlobalTransactional

TM分支业务方法添加本地事务注解:@Transactional

说明:以上的配置,如果不能够理解,请参考配套视频

代码实现 10WQPS秒杀实操的AT分布式事务架构

在这里插入图片描述

如果你经过前面的步骤搭建Seata环境完成了,那么你可以尝试一下启动项目,控制台无异常则搭建成功。

那么下面准备以Seata官方文档上的一个经典例子为题,模拟用户下单,创建订单同时扣减库存数量这一过程中产生的分布式事务问题,然后使用Seata解决,正好使用以下Seata的特性。

服务秒杀 seckillController @RestController @RequestMapping("/api/seckill/seglock/") @Api(tags = "秒杀练习分布式事务 版本") public class SeckillBySegmentLockController { @Resource SeataSeckillServiceImpl seataSeckillServiceImpl; /** * 执行秒杀的操作 * 减库存,下订单 *

* { * "exposedKey": "4b70903f6e1aa87788d3ea962f8b2f0e", * "newStockNum": 10000, * "seckillSkuId": 1247695238068177920, * "seckillToken": "0f8459cbae1748c7b14e4cea3d991000", * "userId": 37 * } * * @return */ @ApiOperation(value = "秒杀") @PostMapping("/doSeckill/v1") RestOut doSeckill(@RequestBody SeckillDTO dto) { seataSeckillServiceImpl.doSeckill(dto); return RestOut.success(dto).setRespMsg("秒杀成功"); } } seckillServiceImpl public class SeataSeckillServiceImpl { @Autowired private SeataDemoOrderFeignClient stockFeignClient; @Autowired private SeataDemoStockFeignClient orderFeignClient; /** * 减库存,下订单 */ @GlobalTransactional //开启全局事务(重点) 使用 seata 的全局事务 public void doSeckill(@RequestBody SeckillDTO dto) { orderFeignClient.minusStock(dto); stockFeignClient.addOrder(dto); } } 库存服务的Feign类 @FeignClient(name = "seata-stock-demo", path = "/seata-stock-demo/api/seckill/sku/") public interface SeataDemoStockFeignClient { /** * minusStock 秒杀库存 * * @param dto 商品与库存 * @return 商品 skuDTO */ @RequestMapping(value = "/minusStock/v1", method = RequestMethod.POST) RestOut minusStock(@RequestBody SeckillDTO dto); } 订单服务的Feign类 @FeignClient(name = "seata-order-demo", path = "/seata-order-demo/api/seckill/order/") public interface SeataDemoOrderFeignClient { @RequestMapping(value = "/addOrder/v1", method = RequestMethod.POST) RestOut addOrder(@RequestBody SeckillDTO dto); } 订单服务 orderController @RestController @RequestMapping("/api/seckill/order/") @Api(tags = "秒杀练习 订单管理") public class SeataATOrderController { @Resource SeckillOrderServiceImpl seckillOrderService; /** * 查询用户订单信息 * * @param userId 用户id * @return 商品 dto */ @PostMapping("/user/{id}/list/v1") @ApiOperation(value = "查询用户订单信息") RestOut userOrders( @PathVariable(value = "id") Long userId, @RequestBody PageReq pageReq) { PageOut dto = seckillOrderService.findOrderByUserID(userId, pageReq); if (null != dto) { return RestOut.success(dto).setRespMsg("查询成功"); } return RestOut.error("查询失败"); } /** * 查询用户的订单信息 * * @param userId 用户id * @param skuId 商品id * @return 商品 dto */ @GetMapping("/{userId}/{skuId}/v1") @ApiOperation(value = "查询用户订单信息") RestOut userOrders( @PathVariable(value = "userId") Long userId, @PathVariable(value = "skuId") Long skuId, @RequestBody PageReq pageReq) { List pos = seckillOrderService.findOrderByUserIDAndSkuId(userId, skuId); if (null != pos && pos.size() > 0) { SeckillOrderDTO orderDTO = new SeckillOrderDTO(); BeanUtils.copyProperties(pos.get(0), orderDTO); return RestOut.success(orderDTO).setRespMsg("查询成功"); } return RestOut.error("查询失败"); } /** * 清除用户订单信息 * * @param dto 含有 用户id的dto * @return 操作结果 */ @PostMapping("/user/clear/v1") @ApiOperation(value = "清除用户订单信息") RestOut userOrdersClear(@RequestBody SeckillDTO dto) { Long userId = dto.getUserId(); String result = seckillOrderService.clearOrderByUserID(userId); return RestOut.success(result).setRespMsg("处理完成"); } /** * 执行秒杀的操作 *

*

* { * "exposedKey": "4b70903f6e1aa87788d3ea962f8b2f0e", * "newStockNum": 10000, * "seckillSkuId": 1157197244718385152, * "seckillToken": "0f8459cbae1748c7b14e4cea3d991000", * "userId": 37 * } * * @return */ @ApiOperation(value = "下订单") @PostMapping("/addOrder/v1") RestOut addOrder(@RequestBody SeckillDTO dto) { SeckillOrderDTO orderDTO = seckillOrderService.addOrder(dto); return RestOut.success(orderDTO).setRespMsg("下订单成功"); } } OrderServiceImpl /** * 执行秒杀下单 * * @param inDto * @return */ @Transactional //开启本地事务 // @GlobalTransactional//不,开启全局事务(重点) 使用 seata 的全局事务 public SeckillOrderDTO addOrder(SeckillDTO inDto) { long skuId = inDto.getSeckillSkuId(); Long userId = inDto.getUserId(); /** * 创建订单对象 */ SeckillOrderPO order = SeckillOrderPO.builder() .skuId(skuId).userId(userId).build(); Date nowTime = new Date(); order.setCreateTime(nowTime); order.setStatus(SeckillConstants.ORDER_VALID); SeckillOrderDTO dto = null; /** * 创建重复性检查的订单对象 */ SeckillOrderPO checkOrder = SeckillOrderPO.builder().skuId( order.getSkuId()).userId(order.getUserId()).build(); //记录秒杀订单信息 long insertCount = seckillOrderDao.count(Example.of(checkOrder)); //唯一性判断:skuId,id 保证一个用户只能秒杀一件商品 if (insertCount >= 1) { //重复秒杀 log.error("重复秒杀"); throw BusinessException.builder().errMsg("重复秒杀").build(); } /** * 插入秒杀订单 */ seckillOrderDao.save(order); dto = new SeckillOrderDTO(); BeanUtils.copyProperties(order, dto); return dto; } 库存服务 stockController @Slf4j @RestController @RequestMapping("/api/seckill/sku/") @Api(tags = "商品库存") public class SeataSeckillStockController { @Resource SeataStockServiceImpl seckillSkuStockService; /** * minusStock 秒杀库存 * * @param dto 商品与库存 * @return 商品 skuDTO */ @PostMapping("/minusStock/v1") @ApiOperation(value = "减少秒杀库存") RestOut minusStock(@RequestBody SeckillDTO dto, HttpServletRequest request) { // 绑定 XID,自动创建分支事物 // 异常后,整个调用链路回滚 String keyId = request.getHeader(RootContext.KEY_XID); if (null != keyId) { log.info("RootContext.KEY_XID is {}", keyId); // 绑定 XID,自动创建分支事物 RootContext.bind(keyId); } Long skuId = dto.getSeckillSkuId(); SeckillSkuDTO skuDTO = seckillSkuStockService.minusStock(dto); if (null != skuDTO) { return RestOut.success(skuDTO).setRespMsg("减少秒杀库存成功"); } return RestOut.error("未找到指定秒杀商品"); } } StockServiceImpl /** * 执行秒杀下单 * * @param inDto * @return */ @Transactional public SeckillSkuDTO minusStock(SeckillDTO inDto) { long skuId = inDto.getSeckillSkuId(); Optional optional = seckillSkuDao.findById(skuId); if (!optional.isPresent()) { throw BusinessException.builder().errMsg("商品不存在").build(); } SeckillSkuPO po = optional.get(); if (po.getStockCount()



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3