gRPC

您所在的位置:网站首页 grpc源码解析 gRPC

gRPC

2023-09-27 03:27| 来源: 网络整理| 查看: 265

看完有收获,肯定关注,点赞

gRPC已经断断续续写了七篇文章了,但基本都是属于gRPC的使用上,旨在帮助大家如何使用gRPC,了解gRPC的功能以及特性,并通过示例代码让大家能快速入门,对开发人员而言,一个可运行的demo是最好的教程,文章连接和可运行demo如下,今天我们开始深入一点gRPC服务是怎么启动的,一起揭开他的神秘面纱。

1. gRPC的特性和背后设计的原则2. gRPC的接口描述语言ProtoBuffer3. gRPC之GoLang入门HelloWord4. gRPC之流式调用原理http2协议分析5. gRPC认证的多种方式实践6. gRPC拦截器那点事,希望帮到你7. gRPC注册中心,常用的注册中心你懂了吗?AP还是CP

https://github.com/sunpengwei1992/go_common/tree/master/grpc/helloworld_demo

一个gRPC-Server启动主要以下几行代码,如下一个简单的gRPC-Server就启动起来了,但其流程可不简单,简单的背后意味着封装,一行一行来分析

func StartServer() { lis, _ := net.Listen("tcp", "127.0.0.1:8090") //创建一个grpc服务器对象 gRpcServer := grpc.NewServer() pb.RegisterHelloServiceServer(gRpcServer, &impl.HelloServiceServer{}) //开启服务端 gRpcServer.Serve(lis) }

下面这一行代码从表面很简单,创建了一个grpServer实例,但是这个实例的参数以及入参的参数是非常多了,弄明白了这些参数的含义,后面代码的阅读会舒畅很多

gRpcServer := grpc.NewServer()` 分析NewServer源码 // NewServer creates a gRPC server which has no service registered and has not started to accept requests yet. //NewServer创建一个grpc服务器,该服务器没有注册服务,还不能开始接收请求 func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o.apply(&opts) } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[transport.ServerTransport]bool), m: make(map[string]*service), quit: grpcsync.NewEvent(),//退出事件 done: grpcsync.NewEvent(),//完成事件 czData: new(channelzData), } s.cv = sync.NewCond(&s.mu) if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server")) } if channelz.IsOn() { s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") } return s }

上面的代码,总体四个步骤,当然,其中也有诸多细节,暂不深追究,后面会说到

接受入参参数切片,并把默认参数全部赋值到入参上 构造Server实例 判断是否tracing(链路跟踪),IsOn(返回channelz数据收集是否打开) 返回server实例

然后我们说一下入参和返回Server的结构体的参数组成都是什么含义?

ServerOption入参 type serverOptions struct { creds credentials.TransportCredentials //cred证书 codec baseCodec //序列化和反序列化 cp Compressor //压缩接口 dc Decompressor //解压缩接口 unaryInt UnaryServerInterceptor //一元拦截器 streamInt StreamServerInterceptor //流拦截器 inTapHandle tap.ServerInHandle //见下面文档 statsHandler stats.Handler //见下面文档 maxConcurrentStreams uint32 //http2中最大的并发流个数 maxReceiveMessageSize int //最大接受消息大小 maxSendMessageSize int //最大发送消息大小 unknownStreamDesc *StreamDesc keepaliveParams keepalive.ServerParameters //长连接的server参数 keepalivePolicy keepalive.EnforcementPolicy //初始化stream的Window大小,下限值是64K,上限2^31 initialWindowSize int32 //初始化conn大小,一个conn会有多个stream, 等于上面的值 * 16 ,http2的限 制是大于0,默认一个连接有100个流,超过了就被拒绝 initialConnWindowSize int32 writeBufferSize int //写缓冲大小 readBufferSize int //读缓冲大小 connectionTimeout time.Duration //连接超时时间 maxHeaderListSize *uint32 //最大头部大小 }

cred证书是接口grpc内部是有实现的代码包如下,我们使用是只需要,调用方法传入证书文件就可以了

\google.golang.org\[email protected]\credentials\credentials.go creds, err := credentials.NewClientTLSFromFile("E:\\server.pem", "")

inTapHandle tap.ServerInHandle定义在服务器端创建新流之前运行的函数,如果你定义的这个ServerInhandler返回非nil错误,则服务端不会创建流,并将一个rst_stream流发送给具有refused_stream的客户端,它旨在用于您不想服务端创建新的流浪费资源来接受新客户端的情况,比如微服务常见的限流功能,注意,他是在每个conn的协程中执行,而不是在每个rpc的协程中执行中执行。statsHandler stats.Handler这个接口中定义的方法主要是为统计做处理的,比如一次调用中的rpc和conn,默认的实现有如下

ClientHandler //主要是Client端服务的(rpc,conn)跟踪和状态统计 ServerHandler //主要是Server端服务的(rpc,conn)跟踪和状态统计 statshandler //grpc测试用的 Server参数 // Server is a gRPC server to serve RPC requests. type Server struct { opts serverOptions //上面介绍的就是 mu sync.Mutex // guards following lis map[net.Listener]bool //服务端的监听地址 //server transport是所有grpc服务器端传输实现的通用接口。 conns map[transport.ServerTransport]bool serve bool //表示服务是否开启,在Serve()方法中赋值为true drain bool //在调用GracefulStop(优雅的停止服务)方法被赋值为true cv *sync.Cond // 当连接关闭以正常停止时发出信号 m map[string]*service // service name -> service info events trace.EventLog //跟踪事件日志 quit *grpcsync.Event //同步退出事件 done *grpcsync.Event //同步完成事件 channelzRemoveOnce sync.Once serveWG sync.WaitGroup //counts active Serve goroutines for GracefulStop channelzID int64 // channelz unique identification number czData *channelzData //存储一些conn的自增id数据 } 分析RegisterHelloServiceServer源码

我们看到最上面的StartServer代码中调用了pb(proto buffer)的代码,这是自动生成的,这个方法的作用要把HelloService的实现注册到Server上,我们看下面的代码,这个方法的入参有两个,一个是NewServer创建的grpcServer实例,一个是HelloService的实现类,然后调用grpcServer的RegisterService的方法。

func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) { s.RegisterService(&_HelloService_serviceDesc, srv) }

RegisterService方法如下,registerservice将服务及其实现注册到grpc服务器。它是从idl(接口描述语言 Interface Description Lanauage)的代码中调用的。这必须是在调用SERVE方法之前调用。s.register(sd, ss) 方法最终是吧服务的名称的和服务的描述信息注册到上面Server中的map[string]*service 中

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { grpclog.Fatalf("grpc: Server.RegisterService found") } //注册服务 s.register(sd, ss) } //删除了非核心代码 func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() //构造服务的实例 srv := &service{ server: ss, md: make(map[string]*MethodDesc), sd: make(map[string]*StreamDesc), mdata:sd.Metadata, } //放入方法 for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } //放入流 for i := range sd.Streams { d := &sd.Streams[i] srv.sd[d.StreamName] = d } s.m[sd.ServiceName] = srv } 分析gRpcServer.Serve(lis)源码

Server()方法就正式开始监听客户端的连接,并开启协程处理客户端连接,方法核心步骤如下

加锁,初始化一些参数 defer处理最后的资源情况 for循环接受客户端的连接,每一个客户端的连接,开启一个协程处理 func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() s.serve = true s.serveWG.Add(1) //优雅的停止服务 defer func() { s.serveWG.Done() if s.quit.HasFired() {


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3