gRPC is a modern open source high performance Remote Procedure Call (RPC) framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication. It is also applicable in last mile of distributed computing to connect devices, mobile applications and browsers to backend services.
// conn 为 grpc connection,可以通过 grpc.Dial 来生成或大部分微服务框架都提供了连接方法
resp,err:=NewOrderServiceClient(conn).GetOrder(context.Background(),&Empty{})iferr!=nil{fmt.Println(err)}// end of rpc call, do own biz
// OrderServiceServer is the server API for OrderService service.
// All implementations must embed UnimplementedOrderServiceServer
// for forward compatibility
// 这里需要说明一下,为了确保服务的稳定性,实现该接口的结构必需包含 UnimplementedOrderServiceServer,这样即便我们只实现其中一部分的方法,也不会导致服务崩溃或不可用。
typeOrderServiceServerinterface{GetOrder(context.Context,*Empty)(*Empty,error)CreateOrder(context.Context,*Empty)(*Empty,error)mustEmbedUnimplementedOrderServiceServer()}// UnimplementedOrderServiceServer must be embedded to have forward compatible implementations.
typeUnimplementedOrderServiceServerstruct{}func(UnimplementedOrderServiceServer)GetOrder(context.Context,*Empty)(*Empty,error){returnnil,status.Errorf(codes.Unimplemented,"method GetOrder not implemented")}func(UnimplementedOrderServiceServer)CreateOrder(context.Context,*Empty)(*Empty,error){returnnil,status.Errorf(codes.Unimplemented,"method CreateOrder not implemented")}func(UnimplementedOrderServiceServer)mustEmbedUnimplementedOrderServiceServer(){}// UnsafeOrderServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to OrderServiceServer will
// result in compilation errors.
typeUnsafeOrderServiceServerinterface{mustEmbedUnimplementedOrderServiceServer()}
// 这里是我们外部注册入口
funcRegisterOrderServiceServer(sgrpc.ServiceRegistrar,srvOrderServiceServer){s.RegisterService(&OrderService_ServiceDesc,srv)}// 每个接口的处理方法,内部调用的是这个方法
func_OrderService_GetOrder_Handler(srvinterface{},ctxcontext.Context,decfunc(interface{})error,interceptorgrpc.UnaryServerInterceptor)(interface{},error){in:=new(Empty)iferr:=dec(in);err!=nil{returnnil,err}ifinterceptor==nil{returnsrv.(OrderServiceServer).GetOrder(ctx,in)}info:=&grpc.UnaryServerInfo{Server:srv,FullMethod:"/api.user.session.v1.OrderService/GetOrder",}handler:=func(ctxcontext.Context,reqinterface{})(interface{},error){returnsrv.(OrderServiceServer).GetOrder(ctx,req.(*Empty))}returninterceptor(ctx,in,info,handler)}func_OrderService_CreateOrder_Handler(srvinterface{},ctxcontext.Context,decfunc(interface{})error,interceptorgrpc.UnaryServerInterceptor)(interface{},error){in:=new(Empty)iferr:=dec(in);err!=nil{returnnil,err}ifinterceptor==nil{returnsrv.(OrderServiceServer).CreateOrder(ctx,in)}info:=&grpc.UnaryServerInfo{Server:srv,FullMethod:"/api.user.session.v1.OrderService/CreateOrder",}handler:=func(ctxcontext.Context,reqinterface{})(interface{},error){returnsrv.(OrderServiceServer).CreateOrder(ctx,req.(*Empty))}returninterceptor(ctx,in,info,handler)}// OrderService_ServiceDesc is the grpc.ServiceDesc for OrderService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
varOrderService_ServiceDesc=grpc.ServiceDesc{ServiceName:"api.user.session.v1.OrderService",HandlerType:(*OrderServiceServer)(nil),Methods:[]grpc.MethodDesc{{// 内部实现时,先根据 serviceName 确定 service,再根据 methodName 确定 method,然后调用 Handler
MethodName:"GetOrder",Handler:_OrderService_GetOrder_Handler,},{MethodName:"CreateOrder",Handler:_OrderService_CreateOrder_Handler,},},Streams:[]grpc.StreamDesc{},Metadata:"user/session/v1/session.proto",}
// --- service package
packageservice// ...
typeBizOrderstruct{// orderpb 包包含我们之前生成的文件
orderpb.UnimplementedOrderServiceServer}func(s*BizOrder)GetOrder(ctxcontext.Context,in*Empty)(*Empty,error){// do something
return&Empty{},nil}func(s*BizOrder)CreateOrder(ctxcontext.Context,in*Empty)(*Empty,error){// do something
return&Empty{},nil}// --- main package
packagemainfuncmain(){// ... init gprc server
// register service
orderpb.RegisterOrderServiceServer(grpcServer,&service.BizOrder{})}
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func(t*http2Client)Write(s*Stream,hdr[]byte,data[]byte,opts*Options)error{ifopts.Last{// If it's the last message, update stream state.
if!s.compareAndSwapState(streamActive,streamWriteDone){returnerrStreamDone}}elseifs.getState()!=streamActive{returnerrStreamDone}df:=&dataFrame{streamID:s.id,endStream:opts.Last,h:hdr,d:data,}ifhdr!=nil||data!=nil{// If it's not an empty data frame, check quota.
iferr:=s.wq.get(int32(len(hdr)+len(data)));err!=nil{returnerr}}// controlBuf 底层为一个缓冲区,用于存储控制数据,比如 header 和 data。基于单向链表实现
returnt.controlBuf.put(df)}// writeLoop 内部调用 write 方法,循环发送数据
// 这段注释其实写的很详细了,我们可以看到,这里的 writeLoop 内部调用了 write 方法,然后再调用了一个单独的 goroutine,这个 goroutine 就
// 是一个单向链表的消费者,直到链表为空,然后再一次性写入到 socket 中。
// run should be run in a separate goroutine.
// It reads control frames from controlBuf and processes them by:
// 1. Updating loopy's internal state, or/and
// 2. Writing out HTTP2 frames on the wire.
//
// Loopy keeps all active streams with data to send in a linked-list.
// All streams in the activeStreams linked-list must have both:
// 1. Data to send, and
// 2. Stream level flow control quota available.
//
// In each iteration of run loop, other than processing the incoming control
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
// This results in writing of HTTP2 frames into an underlying write buffer.
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
// if the batch size is too low to give stream goroutines a chance to fill it up.
func(l*loopyWriter)run()(errerror){deferfunc(){iferr==ErrConnClosing{// Don't log ErrConnClosing as error since it happens
// 1. When the connection is closed by some other known issue.
// 2. User closed the connection.
// 3. A graceful close of connection.
iflogger.V(logLevel){logger.Infof("transport: loopyWriter.run returning. %v",err)}err=nil}}()for{it,err:=l.cbuf.get(true)iferr!=nil{returnerr}iferr=l.handle(it);err!=nil{returnerr}if_,err=l.processData();err!=nil{returnerr}gosched:=truehasdata:for{it,err:=l.cbuf.get(false)iferr!=nil{returnerr}ifit!=nil{// 根据数据类型做不同的处理
// 如果是stream data,则会把数据写入到 loopWriter 的 activeStreams 中, 也是个单向链表
iferr=l.handle(it);err!=nil{returnerr}// 从 activeStreams 中读取一个数据 然后把数据写入到 loopWriter 的 frameBuf 中
// 该方法的第一参数为 bool,当 activeStreams 为空是返回true,否则返回false
if_,err=l.processData();err!=nil{returnerr}// 读完读取下一个
continuehasdata}isEmpty,err:=l.processData()iferr!=nil{returnerr}// activeStreams 中依然有数据还没 process
if!isEmpty{continuehasdata}ifgosched{gosched=false// 如果当前处理的数据大小小于 minBatchSize(1000),则休眠一下,等待下一次的数据
ifl.framer.writer.offset<minBatchSize{runtime.Gosched()continuehasdata}}// 数据 flush 到 socket
l.framer.writer.Flush()breakhasdata}}}
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func(s*Server)handleRawConn(lisAddrstring,rawConnnet.Conn){ifs.quit.HasFired(){rawConn.Close()return}rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))// Finish handshaking (HTTP2)
st:=s.newHTTP2Transport(rawConn)rawConn.SetDeadline(time.Time{})ifst==nil{return}if!s.addConn(lisAddr,st){return}gofunc(){s.serveStreams(st)s.removeConn(lisAddr,st)}()}
func(ht*serverHandlerTransport)HandleStreams(startStreamfunc(*Stream),traceCtxfunc(context.Context,string)context.Context){// With this transport type there will be exactly 1 stream: this HTTP request.
// ...ominous code here...
s:=&Stream{id:0,// irrelevant
requestRead:func(int){},cancel:cancel,buf:newRecvBuffer(),st:ht,method:req.URL.Path,recvCompress:req.Header.Get("grpc-encoding"),contentSubtype:ht.contentSubtype,}pr:=&peer.Peer{Addr:ht.RemoteAddr(),}ifreq.TLS!=nil{pr.AuthInfo=credentials.TLSInfo{State:*req.TLS,CommonAuthInfo:credentials.CommonAuthInfo{SecurityLevel:credentials.PrivacyAndIntegrity}}}ctx=metadata.NewIncomingContext(ctx,ht.headerMD)s.ctx=peer.NewContext(ctx,pr)ifht.stats!=nil{s.ctx=ht.stats.TagRPC(s.ctx,&stats.RPCTagInfo{FullMethodName:s.method})inHeader:=&stats.InHeader{FullMethod:s.method,RemoteAddr:ht.RemoteAddr(),Compression:s.recvCompress,}ht.stats.HandleRPC(s.ctx,inHeader)}// data reader
s.trReader=&transportReader{reader:&recvBufferReader{ctx:s.ctx,ctxDone:s.ctx.Done(),recv:s.buf,freeBuffer:func(*bytes.Buffer){}},windowHandler:func(int){},}// readerDone is closed when the Body.Read-ing goroutine exits.
readerDone:=make(chanstruct{})gofunc(){deferclose(readerDone)// TODO: minimize garbage, optimize recvBuffer code/ownership
constreadSize=8196forbuf:=make([]byte,readSize);;{n,err:=req.Body.Read(buf)ifn>0{s.buf.put(recvMsg{buffer:bytes.NewBuffer(buf[:n:n])})buf=buf[n:]}iferr!=nil{s.buf.put(recvMsg{err:mapRecvMsgError(err)})return}iflen(buf)==0{buf=make([]byte,readSize)}}}()// startStream is provided by the *grpc.Server's serveStreams.
// It starts a goroutine serving s and exits immediately.
// The goroutine that is started is the one that then calls
// into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
startStream(s)ht.runStream()close(requestOver)// Wait for reading goroutine to finish.
req.Body.Close()<-readerDone}