go

您所在的位置:网站首页 前端rpc调用 go

go

2024-04-04 19:32| 来源: 网络整理| 查看: 265

前期准备

如果对 grpc 还不太了解的,可以看看我的这栏文章https://blog.csdn.net/wanmei002/category_11067794.html

因为 服务发现和服务注册用到了 etcd , 但是最新的 grpc 跟 etcd 不兼容,

所以 protoc-gen-go 跟 grpc 的版本要降级

go get -u github.com/golang/protobuf/[email protected]

go get google.golang.org/[email protected]

先贴以下我的 go.mod

require ( github.com/golang/protobuf v1.4.2 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/tal-tech/go-zero v1.1.4 google.golang.org/grpc v1.29.1 ) 简单初始化 一个 grpc server

以下代码来自 go-zero 文档, 地址: https://github.com/tal-tech/zero-doc/blob/main/doc/shorturl.md

建一个文件,进入 初始化: go mod init *****goctl rpc template -o transform.protogoctl rpc proto -src transform.proto -dir .go get github.com/golang/[email protected] get google.golang.org/[email protected] get 获得所有的依赖查看下 etc/transform.yaml 文件里的etcd 地址:端口 是否正确go run transform.go 运行起来程序 rpc 服务之间的调用 修改配置文件

比如我们又建了一个服务 AAA,要调用 transform 服务

修改 AAA 服务的配置文件 修改在 etc/ 目录下的 yaml 文件,在其中添加 transform 服务的 etcd 信息,添加内容如下: Transform: // 名字可以自己取 Etcd: Hosts: - 127.0.0.1:2379 // transform 服务注册的 etcd Key: transform.rpc // transform 服务注册的 服务名 在 AAA 服务目录下的 internal/svc/servicecontext.go 文件中添加 transform 客户端连接 type ServiceContext struct { Config config.Config Transform transformer.Transformer // Transformer服务 客户端接口 位置一般是: transform/transformer/transformer.go }

创建 transform 服务连接,也是在 servicecontext.go 文件中

func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, // 创建 transform 客户端连接 Transform: transformer.NewTransformer(zrpc.MustNewClient(c.Transform)),// c.Transform 是在 yaml 中配置的 transform 服务的配置信息 } }

逻辑一般都在项目下的 internal/logic 文件夹下, 在 rpc 方法中,在其中调用 其它 gRPC 项目的方法可以 用 l.svcCtx.Transform 对象里的方法

现在我们来了解下 go-zero 服务是怎么运行了 新建的实现 proto 中声明的 rpc 方法 // 第二个参数的值 会赋值给 s 的register属性,然后 调用 s.Start() 方法, 注册服务 s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { transform.RegisterTransformerServer(grpcServer, srv) }) 让我们看看新建的时候都做了什么 第一步 func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer { server, err := NewServer(c, register)// 这个是第二步 if err != nil { log.Fatal(err) } return server } 第二步 func NewServer(c RpcServerConf, register internal.RegisterFn) (*RpcServer, error) { var err error // 如果要 Auth 认证,需要配置 redis ,这里检查是否配置了 redis if err = c.Validate(); err != nil { return nil, err } var server internal.Server metrics := stat.NewMetrics(c.ListenOn) if c.HasEtcd() { // 检查 配置的 ip listenOn := figureOutListenOn(c.ListenOn) // 重要的一步 连接etcd 并注册服务,并保持续期 这是第三步 server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, listenOn, internal.WithMetrics(metrics)) if err != nil { return nil, err } } else { server = internal.NewRpcServer(c.ListenOn, internal.WithMetrics(metrics)) } server.SetName(c.Name) if err = setupInterceptors(server, c, metrics); err != nil { return nil, err } rpcServer := &RpcServer{ server: server, register: register, } if err = c.SetUp(); err != nil { return nil, err } return rpcServer, nil } 第三步 server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, listenOn, internal.WithMetrics(metrics))

上面的 NewRpcPubServer 方法里的代码

func NewRpcPubServer(etcdEndpoints []string, etcdKey, listenOn string, opts ...ServerOption) (Server, error) { registerEtcd := func() error { // 连接 etcd 并设置租约 pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, listenOn) return pubClient.KeepAlive() } server := keepAliveServer{ registerEtcd: registerEtcd, // 这是第四步 Server: NewRpcServer(listenOn, opts...), } return server, nil } 第四步 func NewRpcServer(address string, opts ...ServerOption) Server { var options rpcServerOptions for _, opt := range opts { opt(&options) } if options.metrics == nil { options.metrics = stat.NewMetrics(address) } // 这个 server 是比较重要的 他有个方法 return &rpcServer{ baseRpcServer: newBaseRpcServer(address, options.metrics), } }

rpcServer 的 Start 方法, 这个方法就是注册grpc 服务 和 注册拦截器

func (s *rpcServer) Start(register RegisterFn) error { lis, err := net.Listen("tcp", s.address) if err != nil { return err } unaryInterceptors := []grpc.UnaryServerInterceptor{ serverinterceptors.UnaryTracingInterceptor(s.name), serverinterceptors.UnaryCrashInterceptor(), serverinterceptors.UnaryStatInterceptor(s.metrics), serverinterceptors.UnaryPrometheusInterceptor(), } unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...) streamInterceptors := []grpc.StreamServerInterceptor{ serverinterceptors.StreamCrashInterceptor, } streamInterceptors = append(streamInterceptors, s.streamInterceptors...) options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...), WithStreamServerInterceptors(streamInterceptors...)) server := grpc.NewServer(options...) register(server) // we need to make sure all others are wrapped up // so we do graceful stop at shutdown phase instead of wrap up phase waitForCalled := proc.AddWrapUpListener(func() { server.GracefulStop() }) defer waitForCalled() return server.Serve(lis) } 第五步 启动起来 func (rs *RpcServer) Start() { if err := rs.server.Start(rs.register); err != nil { logx.Error(err) panic(err) } }

就这样 grpc 服务就跑起来了



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3