golang源码分析:dtm分布式事务(1)

您所在的位置:网站首页 dtm模型例子 golang源码分析:dtm分布式事务(1)

golang源码分析:dtm分布式事务(1)

2024-07-11 14:52| 来源: 网络整理| 查看: 265

https://github.com/dtm-labs/dtm和seata类似是一个分布式事务管理器,不过是golang实现的,它有丰富的例子可以供我们学习https://github.com/dtm-labs/dtm-examples。常见的事务模式,支持对比如下:

1,TCC事务:dtm和Seata都支持了TCC事务。2,XA事务:dtm和Seata都支持XA事务。dtm采用的是回调函数形式的接口,而Seata采用的是Java特有的注解形式接口,本质都是回调。3,AT事务:AT事务是Seata独有的事务模式(类似XA,性能更高,但有脏回滚)4,SAGA事务:Seata的Saga实现采用了状态机,优点是可以做到灵活配置,缺点是上手难度非常高。dtm支持并发的Saga。5,二阶段消息:dtm支持了二阶段消息事务模式,该模式受到RocketMQ的事务消息启发。提供了比本地消息表和事务消息更简单的架构,更易用的接口:PrepareAndSubmit,适用于无需回滚的数据一致性场景

总的来说,对于golang用户学习分布式事务是一个非常不错的选择。在学习本篇之前,建议先学习下mysql的XAgolang源码分析:golang使用mysql XA事务,然后会发现大家的最终方案都是相似的。也可以对比seata的golang客户端来学习golang源码分析:seata-go (1)at模式,golang源码分析:seata-go (2)tcc模式。

dtm在传输协议上支持三种,grpc,http和http-json,它的通信链路大概可以分为三条:

1,对外提供服务的链路。

2,ap(应用程序)调用dtm(事务管理器)上报数据的链路

3,dtm回调应用程序的链路。

下面,我们基于dtm-examples的qs例子源码对dtm进行简单介绍:

1,对外提供的服务接口是基于gin http服务实现的,它的端口是8081

代码语言:javascript复制go busi.RunHTTP(app) 代码语言:javascript复制const ( // BusiAPI busi api prefix BusiAPI = "/api/busi" // BusiPort busi server port BusiPort = 8081 // BusiGrpcPort busi server port BusiGrpcPort = 58081 )

提供了转入,转出等多个接口。

代码语言:javascript复制func BaseAddRoute(app *gin.Engine) { app.POST(BusiAPI+"/workflow/resume" app.POST(BusiAPI+"/TransIn",

2,上报数据链路,qs例子,实现了saga模式

代码语言:javascript复制saga := dtmcli.NewSaga(dtmServer, shortuuid.New()).

首先通过uuid产生了全局事务id,然后组装回调需要的接口和参数,然后上报给dtm服务器,地址是http://localhost:36789/api/dtmsvr/submit ,也可以是grpc模式

代码语言:javascript复制 const dtmServer = "http://localhost:36789/api/dtmsvr"

对应实现在github.com/dtm-labs/[email protected]/dtmcli/trans_saga.go

代码语言:javascript复制func NewSaga(server string, gid string) *Saga { &Saga{TransBase: *dtmimp.NewTransBase(gid, "saga", server, ""), orders: map[int][]int{}}

github.com/dtm-labs/[email protected]/dtmcli/dtmimp/trans_base.go

代码语言:javascript复制func NewTransBase(gid string, transType string, dtm string, branchID string) *TransBase { return &TransBase{ Gid: gid, TransType: transType, BranchIDGen: BranchIDGen{BranchID: branchID}, Dtm: dtm, TransOptions: TransOptions{PassthroughHeaders: PassthroughHeaders}, Context: context.Background(), }

然后调用Add方法添加需要上报的信息

代码语言:javascript复制func (s *Saga) Add(action string, compensate string, postData interface{}) *Saga { s.Steps = append(s.Steps, map[string]string{"action": action, "compensate": compensate}) s.Payloads = append(s.Payloads, dtmimp.MustMarshalString(postData))

最后通过submit提交给dtm

代码语言:javascript复制func (s *Saga) Submit() error { s.BuildCustomOptions() return dtmimp.TransCallDtm(&s.TransBase, "submit")代码语言:javascript复制// TransCallDtm is the short call for TransCallDtmExt func TransCallDtm(tb *TransBase, operation string) error { _, err := TransCallDtmExt(tb, tb, operation)代码语言:javascript复制func TransCallDtmExt(tb *TransBase, body interface{}, operation string) (*resty.Response, error) { resp, err := RestyClient.R(). SetBody(body).Post(fmt.Sprintf("%s/%s", tb.Dtm, operation))

3,dtm回调业务的接口也是基于gin的http服务实现的,端口是8082

代码语言:javascript复制// busi address const qsBusiAPI = "/api/busi_start" const qsBusiPort = 8082 var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI)

在启动1中的gin http服务后,会调用dtm的上报接口,将回调信息上报给dtm,其中就包括dtm 回调用的接口,详细信息如下:

代码语言:javascript复制[ { "action": "http://localhost:8082/api/busi_start/TransOut", "compensate": "http://localhost:8082/api/busi_start/TransOutCompensate" }, { "action": "http://localhost:8082/api/busi_start/TransIn", "compensate": "http://localhost:8082/api/busi_start/TransInCompensate" } ], "payloads": ["{mount\:30}","{mount\:30}" ]

对于qs实例来说,就包括两个分支事务的正向操作接口和对应的补偿接口,具体实现在qsAddRoute方法里面:

代码语言:javascript复制func qsAddRoute(app *gin.Engine) { app.POST(qsBusiAPI+"/TransIn",

4,在dtm的server端提供了相应的submit接口,接收客户端也就是应用程序提交的submit请求,对应路由注册在dtmsvr/api_http.go

代码语言:javascript复制func addRoute(engine *gin.Engine) { engine.GET("/api/dtmsvr/newGid", dtmutil.WrapHandler2(newGid)) engine.POST("/api/dtmsvr/prepare", dtmutil.WrapHandler2(prepare)) engine.POST("/api/dtmsvr/submit", dtmutil.WrapHandler2(submit)) engine.POST("/api/dtmsvr/abort", dtmutil.WrapHandler2(abort)) engine.POST("/api/dtmsvr/registerBranch", dtmutil.WrapHandler2(registerBranch))

实现位于dtmsvr/api.go

代码语言:javascript复制func svcSubmit(t *TransGlobal) interface{} 代码语言:javascript复制func submit(c *gin.Context) interface{} { return svcSubmit(TransFromContext(c))

下面我们启动官方的例子跑一下:

首先启动dtm 服务端

代码语言:javascript复制cd dtm go run main.go

然后运行例子中的qs例子

代码语言:javascript复制cd dtm-examples go run main.go qs

可以看到下面的日志

代码语言:javascript复制{"level":"info","ts":"2022-12-18T18:29:24.433+0800","caller":"dtmutil/db.go:103","msg":"connecting 'mysql' 'en.dtm.pub' 'dtm' '3306' ''"} {"level":"debug","ts":"2022-12-18T18:29:24.713+0800","caller":"dtmutil/db.go:79","msg":"installing db plugin: tracePlugin"} {"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"dtmutil/db.go:68","msg":"used: 34 ms affected: -1 sql is: xa recover"} {"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_grpc.go:57","msg":"dtm client inited"} {"level":"info","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_http.go:54","msg":"examples starting"} {"level":"debug","ts":"2022-12-18T18:29:24.748+0800","caller":"busi/base_http.go:69","msg":"initing BarrierSetup"} {"level":"debug","ts":"2022-12-18T18:29:24.949+0800","caller":"busi/base_http.go:77","msg":"Starting busi at: 8081"} 2022/12/18 18:29:25 quick start examples listening at 8082 {"level":"debug","ts":"2022-12-18T18:29:25.252+0800","caller":"dtmimp/vars.go:46","msg":"requesting: POST http://localhost:36789/api/dtmsvr/submit {\"gid\":\"QHEbkW3zWVwFEpWuAdRwQK\",\"trans_type\":\"saga\",\"concurrent\":false,\"steps\":[{\"action\":\"http://localhost:8082/api/busi_start/TransOut\",\"compensate\":\"http://localhost:8082/api/busi_start/TransOutCompensate\"},{\"action\":\"http://localhost:8082/api/busi_start/TransIn\",\"compensate\":\"http://localhost:8082/api/busi_start/TransInCompensate\"}],\"payloads\":[\"{\\\"amount\\\":30}\",\"{\\\"amount\\\":30}\"],\"protocol\":\"\"} resolved: http://localhost:36789/api/dtmsvr/submit"} {"level":"debug","ts":"2022-12-18T18:29:25.294+0800","caller":"dtmimp/vars.go:54","msg":"requested: 200 POST http://localhost:36789/api/dtmsvr/submit {\"dtm_result\":\"SUCCESS\"}"} 2022/12/18 18:29:25 TransOut 2022/12/18 18:29:25 TransIn 2022/12/18 18:29:25 TransInCompensate 2022/12/18 18:29:25 TransOutCompensate

当然,我们也可以请求下,我们本地服务提供的http服务

代码语言:javascript复制% curl -iv -X POST http://127.0.0.1:8081/api/busi/TransIn -d '{"amount":30}' Note: Unnecessary use of -X or --request, POST is already inferred. * Trying 127.0.0.1:8081... * Connected to 127.0.0.1 (127.0.0.1) port 8081 (#0) > POST /api/busi/TransIn HTTP/1.1 > Host: 127.0.0.1:8081 > User-Agent: curl/7.79.1 > Accept: */* > Content-Length: 13 > Content-Type: application/x-www-form-urlencoded > * Mark bundle as not supporting multiuse < HTTP/1.1 200 OK HTTP/1.1 200 OK < Content-Type: application/json; Content-Type: application/json; charset=utf-8 < Date: Sun, 18 Dec 2022 10:38:13 GMT Date: Sun, 18 Dec 2022 10:38:13 GMT < Content-Length: 4 Content-Length: 4 < * Connection #0 to host 127.0.0.1 left intact nul


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3