GRPC源码分析
2020-07-14
写在前面
grpc 介绍
grpc 是 google 开源的一款高性能的 rpc 框架。github 上介绍如下:
gRPC is a modern, open source, high-performance remote procedure call (RPC) framework that can run anywhere
市面上的 rpc 框架数不胜数,包括 alibaba dubbo 和微博的 motan 等。grpc 能够在众多的框架中脱颖而出是跟其高性能是密切相关的。
CONCEPTS
阅读 grpc 源码之前,我们不妨先了解一些 concepts,github 上也有一些 concepts 介绍
https://github.com/grpc/grpc/blob/master/CONCEPTS.md
1.接口设计
Developers using gRPC start with a language agnostic description of an RPC service (a collection of methods). From this description, gRPC will generate client and server side interfaces in any of the supported languages. The server implements the service interface, which can be remotely invoked by the client interface.
By default, gRPC uses Protocol Buffers as the Interface Definition Language (IDL) for describing both the service interface and the structure of the payload messages. It is possible to use other alternatives if desired.
对一个远程服务 service 的调用,grpc 约定 client 和 server 首先需要约定好 service 的结构。包括一系列方法的组合,每个方法定义、参数、返回体等。对这个结构的描述,grpc 默认是用 protocol buffer 去实现的。
2. Streaming
streaming 在 http/1.x 已经出现了,http2 实现了 streaming 的多路复用。grpc 是基于 http2 实现的。所以 grpc 也实现了 streaming 的多路复用,所以 grpc 的请求有四种模式:Simple RPC、Client-side streaming RPC、Server-side streaming RPC、Bidirectional streaming RPC 。也就是说同时支持单边流和双向流
3. Protocol
grpc 的协议层是基于 http2 设计的,所以你如果想了解 grpc 的话,可以先深入了解 http2
4. Flow Control
grpc 的协议支持流量控制,这里也是采用了 http2 的 flow control 机制。
通过上面的介绍可以看到,grpc 的高性能很大程度上依赖了 http2 的能力,所以要了解 grpc 之前,我们需要先了解以下 http 2 的特性。
http2 特性
- 二进制协议
众所周知,二进制协议比文本形式的协议,发送的数据量肯定是更小,传输效率更高的。所以 http2 比 http/1.x 更高效,因为二进制是不可读的,所以会损失部分可读性。
- 多路复用的流
http/1.x 一个 stream 是需要一个 tcp 连接的,其实从性能上来说是比较浪费的。http2 可以复用 tcp 连接,实现一个 tcp 连接可以处理多个 stream,同时可以为每一个 stream 设置优先级,可以用来告诉对端哪个流更重要。当资源有限的时候,服务器会根据优先级来选择应该先发送哪些流
- 头部压缩
由于 http 协议是一个无状态的协议,导致了很多相同或者类似的 http 请求重复发送时,带的头部信息很多时候是完全一样的。http2 对头部进行压缩,可以减少数据包大小,提高协议性能
- 请求 reset
在 http/1.x 中,当一个含有确切值的 Content-Length 的 http 消息发出之后,需要断开 tcp 连接才能中断。这样会导致需要通过三次握手来重新建立一个新的 tcp 连接,代价是比较大的。在 http2 里面,我们可以通过发送 RST_STREAM 帧来终止当前消息发送,从而避免了浪费带宽和中断已有的连接。
- 服务器推送
如果一个 client 请求资源 A,而 server 知道 client 可能也会需要资源 B, 所以在 client 发起请求前,server 提前将 B 推送给 A 缓存起来,从而可以缩短资源 A 这个请求的响应时间。
- flow control
在 http2 中,每个 http stream 都有自己公示的流量窗口,对于每个 stream 来说,client 和 server 都必须互相告诉对方自己能够处理的窗口大小,stream 中的数据帧大小不得超过能处理的窗口值。
grpc quick start
我们分析 go 版本的 grpc 实现,所以这里主要讲解 grpc-go 的安装和使用
1、安装
go get -u google.golang.org/grpc
2、hello world
grpc-go 官方提供了一些 examples ,都放在 examples 目录下,examples 目录下有三个目录,features 目录主要是 grpc 的一些特写使用,包括路由寻址、keep-alive、负载均衡等。helloworld 目录下主要是提供了一个 helloworld demo。route_guide 目录主要提供了对 grpc 四种调用方式:Simple RPC、Client-side streaming RPC、Server-side streaming RPC、Bidirectional streaming RPC 的模拟调用 demo。
我们通过 grpc 提供的 helloworld demo 来简单了解下如何使用 grpc 。
之前我们说到了 grpc 是通过 protocol buffer 来实现接口的定义的。helloworld 包下已经给我们定义好了一个 helloworld.proto 如下:
syntax = “proto3”;
option java_multiple_files = true;
option java_package = “io.grpc.examples.helloworld”;
option java_outer_classname = “HelloWorldProto”;
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user’s name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
可以看到 SayHello 这个结构体里面已经定义好了一个 rpc Service 调用,
rpc SayHello (HelloRequest) returns (HelloReply) {}
在 Service 中包含一个方法,SayHello, 支持传入一个 HelloRequest 的参数,返回一个 HelloReply 的响应。这两个结构体分别也在 proto 文件里面定义了。
通过 protoc 可以生成一个 pb.go 文件(这里 demo )里面已经生成好了。然后编写一个 client 和 server 即可实现一个完整的 rpc 调用链路,这里 demo 里面也提供了,我们先直接运行一下:
cd $GOPATH/src/google.golang.org/grpc/examples/helloworld
先在一个 terminal 下执行:
go run greeter_server/main.go
然后在另一个 terminal 下执行:
go run greeter_client/main.go
此时 client 会输出 Greeting: Helloworld
2020/07/09 14:59:36 Greeting: Hello world
3、自定义一个 service
这里也是 grpc docs 上一个相同的例子,我贴上来介绍一下,为了让本节内容更完整。
(1)在 pb 文件里面添加一个 rpc 方法
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Sends another greeting
rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user’s name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
(2)生成 pb.go 文件
protoc -I helloworld/ helloworld/helloworld.proto —go_out=plugins=grpc:helloworld
(3)在 greeter_server/main.go 下添加一个方法,入参和返回值和 SayHello 完全相同,使用已经定义过的结构体 HelloRequest 和 HelloReply
func (s *server) SayHelloAgain(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: “Hello again “ + in.Name}, nil
}
(4)在 greeter_client/main.go 下添加一个方法调用,调用 SayHelloAgain 这个方法
r, err = c.SayHelloAgain(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf(“could not greet: %v”, err)
}
log.Printf(“Greeting: %s”, r.Message)
(5)执行
先在一个 terminal 下执行:
go run greeter_server/main.go
然后在另一个 terminal 下执行:
go run greeter_client/main.go
可以看到 client 的 terminal 输出如下:
2020/07/09 15:17:46 Greeting: Hello world
2020/07/09 15:17:46 Greeting: Hello again world
这篇内容主要介绍了 grpc 的安装以及基本使用。分析了通过 grpc 输出 hello world 的基本要素和过程。
grpc 输出 hello world 的处理过程
我们介绍 grpc quick start 时,通过快速启动一个 grpc server 端和 client 端,然后以 rpc 调用的方式输出一个 hello world。那么输出 hello world 需要经过哪些方法的处理呢?…….这个我也不知道,所以我们先去瞅瞅源码,探究一下 hello world 的背后是连接是如何建立的,然后一起来解读这个问题哈哈。
这节内容我们先来研究一下 server 端连接建立过程。
先放上 server 端的 main 函数。
func main() {
lis, err := net.Listen(“tcp”, port)
if err != nil {
log.Fatalf(“failed to listen: %v”, err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf(“failed to serve: %v”, err)
}
}
我们发现其实 server 端连接的建立主要包括三步:
- 创建 server
- server 的注册
- 调用 serve 监听端口并处理请求
Ok,弄清楚主流程之后下面我们进入每个步骤里面去看一下代码实现。
1、创建 server
Server 的创建比较简单,其实就下面一个方法:
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
return s
}
这个方法的核心无非是创建了一个 server 结构体,然后为结构体的属性赋值。我们顺便来瞅瞅 server 的结构:
// Server is a gRPC server to serve RPC requests.
type Server struct {
// serverOptions 就是描述协议的各种参数选项,包括发送和接收的消息大小、buffer大小等等各种,跟 http Headers 类似,我们这里就暂时先不管
opts serverOptions
// 一个互斥锁
mu sync.Mutex // guards following
// listener map
lis map[net.Listener]bool
// connections map
conns map[transport.ServerTransport]bool
// server 是否在处理请求的一个状态位
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
// service map
m map[string]*service // service name -> service info
events trace.EventLog
quit *grpcsync.Event
done *grpcsync.Event
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
czData *channelzData
serverWorkerChannels []chan *serverWorkerData
}
虽然 server 结构体里面各种乱起八糟的字段,但是我们可以先不管哈哈哈,比较重要的无非就是三个 map 表分别用来存放多个 listener 、connection 和 service。其他字段都是为了实现协议描述或者并发控制的功能。我们重点关注下
m map[string]*service
这个结构,service 中主要包含了 MethodDesc 和 StreamDesc 这两个 map
type service struct {
server interface{} // the server for service methods
md map[string]*MethodDesc
sd map[string]*StreamDesc
mdata interface{}
}
type MethodDesc struct {
MethodName string
Handler methodHandler
}
type ServiceDesc struct {
ServiceName string
// The pointer to the service interface. Used to check whether the user
// provided implementation satisfies the interface requirements.
HandlerType interface{}
Methods []MethodDesc
Streams []StreamDesc
Metadata interface{}
}
2、server 注册
server 的注册调用了 RegisterGreeterServer 方法,这个方法是 pb.go 文件里面的,如下:
func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}
这个方法调用了 server 的 RegisterService 方法,然后传入了一个 ServiceDesc 的数据结构,如下 :
var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
{
MethodName: "SayHelloAgain",
Handler: _Greeter_SayHelloAgain_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "helloworld.proto",
}
我们来看看 RegisterService 这个方法,可以看到主要是调用了 register 方法,register 方法则按照方法名为 key,将方法注入到 server 的 service map 中。看到这里我们其实可以预测一下,server 不同 rpc 请求的处理,也是根据 service 中不同的 serviceName 去 service map 中取出不同的 handler 进行处理
// invoking Serve.
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
s.register(sd, ss)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}
3、Serve 过程
回想所有 C/S 模式下,client 和 server 的通信基本是类似的。大致过程无非是 server 通过死循环的方式在某一个端口实现监听,然后 client 对这个端口发起连接请求,握手成功后建立连接,然后 server 处理 client 发送过来的请求数据,根据请求类型和请求参数,调用不同的 handler 进行处理,回写响应数据。
所以,对 server 端来说,主要是了解其如何实现监听,如何为请求分配不同的 handler 和回写响应数据。
上面我们得知 server 调用了 Serve 方法来进行处理,所以立马就想跟进去看看。
跳过前面一堆条件检查和控制代码,直接锁定了一个 for 循环,如下:(中间的一些代码已省略)
for {
rawConn, err := lis.Accept()
……
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
Ok,我们已经看到了监听过程,server 的监听果然是通过一个死循环 调用了 lis.Accept() 进行端口监听。
继续往下看,我们发现新起协程调用了 handleRawConn 这个方法,为了节约篇幅,我们直接看重点代码,如下:
func (s *Server) handleRawConn(rawConn net.Conn) {
……
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
……
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
……
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}
可以看到 handleRawConn 里面实现了 http 的 handshake,还记得之前我们说过,grpc 是基于 http2 实现的吗?这里是不是实锤了……. 发现又通过一个新的协程调用了 serveStreams 这个方法,这个方法干了啥呢?
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
var roundRobinCounter uint32
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
default:
// If all stream workers are busy, fallback to the default code path.
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
其实它主要调用了 handleStream ,继续跟进 handleStream 方法,我们发现了重要线索,如下(省略了部分无关代码)
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
…
service := sm[:pos]
method := sm[pos+1:]
srv, knownService := s.m[service]
if knownService {
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
…
}
重要线索就是这一行
srv, knownService := s.m[service]
还记得我们之前的预测吗?根据 serviceName 去 server 中的 service map,也就是 m 这个字段,里面去取出 handler 进行处理。我们 hello world 这个 demo 的请求不涉及到 stream ,所以直接取出 handler ,然后传给 processUnaryRPC 这个方法进行处理。
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
再来看看 processUnaryRpc 这个方法
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
…
sh := s.opts.statsHandler
if sh != nil {
beginTime := time.Now()
begin := &stats.Begin{
BeginTime: beginTime,
}
sh.HandleRPC(stream.Context(), begin)
defer func() {
end := &stats.End{
BeginTime: beginTime,
EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
}()
}
…
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
}
if s, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, s); e != nil {
grpclog.Warningf(“grpc: Server.processUnaryRPC failed to write status: %v”, e)
}
} else {
switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
default:
panic(fmt.Sprintf(“grpc: Unexpected error (%T) from sendResponse: %v”, st, st))
}
}
if binlog != nil {
h, _ := stream.Header()
binlog.Log(&binarylog.ServerHeader{
Header: h,
})
binlog.Log(&binarylog.ServerTrailer{
Trailer: stream.Trailer(),
Err: appErr,
})
}
return err
}
…
}
我们终于看到了 handler 对 rpc 的处理:
sh := s.opts.statsHandler
sh.HandleRPC(stream.Context(), begin)
sh.HandleRPC(stream.Context(), end)
同时也看到了 response 的回写
s.sendResponse(t, stream, reply, cp, opts, comp)
至此,server 端我们的目标实现,追踪到了整个请求和监听、handler 处理 和 response 回写的过程。
grpc hello world client 解析
来先看代码:
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.Message)
}
可以看到 client 的建立也可以大致分为 3 步:
- 创建一个客户端连接 conn
- 通过一个 conn 创建一个客户端
- 发起 rpc 调用
ok,那我们开始 step by step ,具体看看每一步做了啥
1)创建一个客户端连接 conn
conn, err := grpc.Dial(address, grpc.WithInsecure())
通过 Dial 方法创建 conn,Dial 调用了 DialContext 方法
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
跟进 DialContext,发现 DialContext 这个方法非常长,这里就不贴代码了,具体就是先实例化了一个 ClientConn 的结构体,然后主要为 ClientConn 的 dopts 的各个属性进行初始化赋值。
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
我们先来看看 ClientConn 的结构
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target
authority string
dopts dialOptions
csMgr *connectivityStateManager
balancerBuildOpts balancer.BuildOptions
blockingpicker *pickerWrapper
mu sync.RWMutex
resolverWrapper *ccResolverWrapper
sc *ServiceConfig
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
czData *channelzData
}
dialOptions 其实就是对客户端属性的一些设置,包括压缩解压缩、是否需要认证、超时时间、是否重试等信息。
这里我们来看一下初始化了哪些属性:
connectivityStateManager
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID int64
}
连接的状态管理器,每个连接具有 “IDLE”、“CONNECTING”、“READY”、“TRANSIENT_FAILURE”、“SHUTDOWN”、“Invalid-State” 这几种状态。
pickerWrapper
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
}
pickerWrapper 是对 balancer.Picker 的一层封装,balancer.Picker 其实是一个负载均衡器,它里面只有一个 Pick 方法,它返回一个 SubConn 连接。
type Picker interface {
Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
}
什么是 SubConn 呢?看一下这个类的介绍
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
这里我们就明白了,在分布式环境下,可能会存在多个 client 和 多个 server,client 发起一个 rpc 调用之前,需要通过 balancer 去找到一个 server 的 address,balancer 的 Picker 类返回一个 SubConn,SubConn 里面包含了多个 server 的 address,假如返回的 SubConn 是 “READY” 状态,grpc 会发送 RPC 请求,否则则会阻塞,等待 UpdateBalancerState 这个方法更新连接的状态并且通过 picker 获取一个新的 SubConn 连接。
channelz
channelz 主要用来监测 server 和 channel 的状态,这里的概念和实现比较复杂,暂时不进行深入研究,感兴趣的同学可以参考:https://github.com/grpc/proposal/blob/master/A14-channelz.md 这个 proposal ,初始化的代码如下:
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
Severity: channelz.CtINFO,
},
})
} else {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
})
}
cc.csMgr.channelzID = cc.channelzID
}
Authentication
这一段是对认证信息的初始化校验,这里暂不研究,感兴趣的同学可以了解下 :https://grpc.io/docs/guides/auth/
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
} else {
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
return nil, errCredentialsConflict
}
for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
}
}
}
Dialer
Dialer 是发起 rpc 请求的调用器,dialer 中实现了对 rpc 请求调用的具体细节,所以可以算是我们的重点研究对象之一。dialer 中包括了连接建立、地址解析、服务发现、长连接等等具体策略的实现。
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
)
}
这一段方法只是简单进行了地址的规则解析,我们具体看 DialContext 方法,其中有一行:
addrs, err := d.resolver().resolveAddrList(resolveCtx, "dial", network, address, d.LocalAddr)
可以看到通过 dialer 的 resolver 来进行服务发现,这里以后我们再单独详细讲解。
这里值得一提的是,通过 dialContext 可以看出,这里的 dial 有两种请求方式,一种是 dialParallel , 另一种是 dialSerial。dialParallel 发出两个完全相同的请求,采用第一个返回的结果,抛弃掉第二个请求。dialSerial 则是发出一串(多个)请求。然后采取第一个返回的请求结果( 成功或者失败)。
scChan
scChan 是 dialOptions 中的一个属性,定义如下:
scChan <-chan ServiceConfig
可以看到其实他是一个 ServiceConfig类型的一个 channel,那么 ServiceConfig 是什么呢?源码中对这个类的介绍如下:
// ServiceConfig is provided by the service provider and contains parameters for how clients that connect to the service should behave.
通过介绍得知 ServiceConfig 是服务提供方约定的一些参数。这里说明 client 提供给 server 一个可以通过 channel 来修改这些参数的入口。这里到时候我们介绍服务发现时可以细讲,我们在这里只需要知道 client 的某些属性是可以被 server 修改的就行了
if cc.dopts.scChan != nil {
// Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
scSet = true
}
default:
}
}
2)通过一个 conn 创建一个客户端
通过一个 conn 创建客户端的代码如下:
c := pb.NewGreeterClient(conn)
这一步非常简单,其实是 pb 文件中生成的代码,就是创建一个 greeterClient 的客户端。
type greeterClient struct {
cc *grpc.ClientConn
}
func NewGreeterClient(cc *grpc.ClientConn) GreeterClient {
return &greeterClient{cc}
}
3)发起 rpc 调用
前面在创建 Dialer 的时候,我们已经将请求的 target 解析成了 address。我们猜这一步应该是向指定 address 发起 rpc 请求了。来具体看看
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
SayHello 方法是通过调用 Invoke 的方法去发起 rpc 调用, Invoke 方法如下:
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
Invoke 方法调用了 invoke, 在 invoke 这个方法里面,果然不出所料,我们看到了 sendMsg 和 recvMsg 接口,这两个接口在 clientStream 中被实现了。
SendMsg
我们先来看看 clientStream 中定义的 sendMsg,关键代码如下:
func (cs *clientStream) SendMsg(m interface{}) (err error) {
...
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}
...
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
}
}
先准备数据,然后再调用 csAttempt 这个结构体中的 sendMsg 方法,
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
cs := a.cs
if a.trInfo != nil {
a.mu.Lock()
if a.trInfo.tr != nil {
a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
}
a.mu.Unlock()
}
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
if !cs.desc.ClientStreams {
// For non-client-streaming RPCs, we return nil instead of EOF on error
// because the generated code requires it. finish is not called; RecvMsg()
// will call it with the stream's status independently.
return nil
}
return io.EOF
}
if a.statsHandler != nil {
a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
}
return nil
}
最终是通过 a.t.Write 发出的数据写操作,a.t 是一个 ClientTransport 类型,所以最终是通过 ClientTransport 这个结构体的 Write 方法发送数据
RecvMsg
发送数据是通过 ClientTransport 的 Write 方法,我们猜测接收数据肯定是某个结构体的 Read 方法。这里我们来详细看一下
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
...
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
...
if a.statsHandler != nil {
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
WireLength: payInfo.wireLength,
Length: len(payInfo.uncompressedBytes),
})
}
...
}
可以看到调用了 recv 方法:
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
...
}
再看 recvAndDecompress 方法,调用了 recvMsg
func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
...
}
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}
...
}
这里比较清楚了,最终还是调用了 p.r.Read 方法,p.r 是一个 io.Reader 类型。果然万变不离其中,最终都是要落到 IO 上。