楔子
本节将介绍 grpc 客户端是如何跟 grpc 服务器端建立起连接的。
- 服务端流程
- 初始化 grpc 服务器,注册提供的服务
- 启动 grpc 服务器,监听 tcp 请求
- 建立 tcp 连接
- 建立 http2 连接
- 消息交互
- 客户端流程
- 初始化 grpc 客户端
- 发起 rpc 调用,resolver 解析服务器地址列表,balancer 选择目标地址
- 建立 tcp 连接
- 建立 http 2 连接
服务器端
main 代码
启动 gRPC 服务器。
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}
func main() {
flag.Parse()
// 创建一个监听,协议是tcp,端口号是port
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// 创建 grpc 服务器
s := grpc.NewServer()
// 将实现的 service 注册到 grpc 服务器中
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
// 启动 grpc 服务器
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
监听 tcp 请求
以阻塞方式监听客户端的 TCP 请求。
func (s *Server) Serve(lis net.Listener) error {
// 阻塞式监听客户端的请求
for {
// 使用 net 包中的 Listener 来监听客户端的 tcp 请求
rawConn, err := lis.Accept()
...
// 收到请求后会创建新的协程来处理此请求
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
}()
}
}
处理 tcp 请求
在接收到请求后,服务器与客户端建立 http2 连接。
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
// 检测 grpc 服务器是否关闭
if s.quit.HasFired() {
rawConn.Close()
return
}
// 设置 tcp 链路的 deadline
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
// 完成 http2 握手,创建 http2Server
// 跟客户端交换帧的初始化信息,如帧的大小,窗口大小等
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)
rawConn.SetDeadline(time.Time{})
if st == nil {
return
}
...
// 创建新的协程来处理流
go func() {
s.serveStreams(context.Background(), st, rawConn)
s.removeConn(lisAddr, st)
}()
}
http2 握手
创建 http2Server
在 gRPC 连接的 HTTP/2 握手阶段,客户端向服务器发送帧,服务器响应并发送其帧大小和初始化窗口大小等信息。同时,服务器接收客户端的帧信息并进行相应设置。通过这些交互,双方确定了各自的数据接收能力,然后进入流的处理阶段。
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
...
st, err := transport.NewServerTransport(c, config)
...
return st
}
func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
// 握手
if config.Credentials != nil {
conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
}
...
// 创建 http2 的帧,内部调用了 http2.NewFramer
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
// 初始化设置帧,类型是 http2.Setting 切片
// 包括:帧的最大值,最大流数,初始化窗口的大小等
// Send initial settings as connection preface to client.
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Val: http2MaxFrameLen,
}}
...
// 发送 http2.Setting 配置给客户端,客户端会根据这些信息,更新本地的相应设置。
if err := framer.fr.WriteSettings(isettings...); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
// 发送窗口更新帧
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
return nil, connectionErrorf(false, err, "transport: %v", err)
}
}
...
t := &http2Server{}
...
// 读取客户端发送的 PRI,跟服务器端存储的 clientPreface 值进行比较,校验合法性
// Check the validity of client preface.
preface := make([]byte, len(clientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
...
}
if !bytes.Equal(preface, clientPreface) {
return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
}
// 读取客户端发送的帧,并转换为设置帧,然后更新本地配置
frame, err := t.framer.fr.ReadFrame()
...
sf, ok := frame.(*http2.SettingsFrame)
...
t.handleSettings(sf)
// 启动帧发送器
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
err := t.loopy.run()
...
}()
// 启动 keepalive
go t.keepalive()
return t, nil
}
客户端初始化
main 代码
在 grpc-go v1.63 版本之后,新建客户端使用 grpc.NewClient
, 而不是 grpc.Dial
或 grpc.DialContext
,这两个方法标记为 Deprecated,不推荐使用。
NewClient
用于从客户端应用程序创建与 gRPC 服务器的虚拟连接。它接收一个目标 URI(表示逻辑后端服务的名称并解析为一个或多个物理地址)和一组选项,返回一个 ClientConn
对象,表示与服务器的虚拟连接。ClientConn
包含一个或多个实际的连接,并在连接中断时自动尝试重连。
NewClient
和之前 Dial
的区别:
- 不会执行 I/O 操作,即新建客户端时不会立刻开始连接,这样和其他语言的 gRPC 工作方式保持了一致。
- 使用
ClientConn
进行 RPC 调用将自动使其连接。 - 默认 resolver 使用
dns
,之前的默认 resolver 使用的是passthrough
func main() {
// 传入 target URI 和 options 创建 ClientConn
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
...
c := pb.NewGreeterClient(conn)
...
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
}
新建 Client
建立 grpc 管道
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
// 创建 ClientConn 对象
cc := &ClientConn{
target: target,
// 服务器地址 map,一个 target 可能由多个服务器提供服务
conns: make(map[*addrConn]struct{}),
// 默认的连接配置,如 target 使用的 scheme、读缓存大小、写缓存大小、重连机制等等
dopts: defaultDialOptions(),
}
...
// 设置默认的连接配置
if !disableGlobalOpts {
for _, opt := range globalDialOptions {
opt.apply(&cc.dopts)
}
}
// 设置用户的连接配置
for _, opt := range opts {
opt.apply(&cc.dopts)
}
// 设置 resolver
// Determine the resolver to use.
if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
return nil, err
}
...
// 设置 interceptor
// 包含如下用户传入的:
// grpc.WithStreamInterceptor()
// grpc.WithChainStreamInterceptor()
// grpc.WithUnaryInterceptor()
// grpc.WithChainUnaryInterceptor
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
// 校验 Credentials
// 包含如下用户传入的:
// grpc.WithInsecure()
// grpc.WithPerRPCCredentials()
// grpc.WithTransportCredentials()
if err := cc.validateTransportCredentials(); err != nil {
return nil, err
}
// 解析 ServiceConfig
// 包含如下用户传入的:
// grpc.WithDefaultServiceConfig()
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
if err = cc.initAuthority(); err != nil {
return nil, err
}
// Register ClientConn with channelz. Note that this is only done after
// channel creation cannot fail.
cc.channelzRegistration(target)
channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
// 初始化csMgr,保存连接状态
// 如 Idle、Connecting、Ready、TransientFailure、Shutdown
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
// 初始化 wrappers、conns,保存状态是 idle
cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
return cc, nil
}
设置自定义 options
go-idiom,给一个默认设置,再提供一些更改默认设置的方法参数。
for _, opt := range opts {
opt.apply(&cc.dopts)
}
// DialOption configures how we set up the connection.
type DialOption interface {
apply(*dialOptions)
}
// funcDialOption wraps a function that modifies dialOptions into an
// implementation of the DialOption interface.
type funcDialOption struct {
f func(*dialOptions)
}
func (fdo *funcDialOption) apply(do *dialOptions) {
fdo.f(do)
}
func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
return &funcDialOption{
f: f,
}
}
设置 interceptor
洋葱模型,将所有的 interceptor 组装到一起
// chainUnaryClientInterceptors chains all unary client interceptors into one.
func chainUnaryClientInterceptors(cc *ClientConn) {
interceptors := cc.dopts.chainUnaryInts
// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
// be executed before any other chained interceptors.
if cc.dopts.unaryInt != nil {
interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
}
var chainedInt UnaryClientInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
}
}
cc.dopts.unaryInt = chainedInt
}
创建 resolver builder
parseTarget 的三个属性:
- Scheme string: 如 dns,example,passthrough 等,如 dns 表示该解析器底层是通过调用dns服务来解析用户指定的地址的
- Authority string:
- Endpoint string: 如 localhost: 50051,或域名 foo.bar 等
grpc.WithResolvers
的优先级别高于resolver.Register
func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
logger.Infof("original dial target is: %q", cc.target)
var rb resolver.Builder
// 解析 cc.target 得到 ClientConn 中的属性 parseTarget
// parseTarget 内部只有一个 url.URL
parsedTarget, err := parseTarget(cc.target)
if err == nil {
// 初始化 resolver builder
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
cc.resolverBuilder = rb
return nil
}
}
// We are here because the user's dial target did not contain a scheme or
// specified an unregistered scheme. We should fallback to the default
// scheme, except when a custom dialer is specified in which case, we should
// always use passthrough scheme. For either case, we need to respect any overridden
// global defaults set by the user.
defScheme := cc.dopts.defaultScheme
if internal.UserSetDefaultScheme {
defScheme = resolver.GetDefaultScheme()
}
canonicalTarget := defScheme + ":///" + cc.target
parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
return err
}
rb = cc.getResolver(parsedTarget.URL.Scheme)
if rb == nil {
return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
}
cc.parsedTarget = parsedTarget
cc.resolverBuilder = rb
return nil
}
校验 credential
如果想设置此次的 tcp 连接为非安全连接的话,需要显示的设置 grpc.WithTransportCredentials(insecure.NewCredentials())
,如果没有设置默认是安全的。
func (cc *ClientConn) validateTransportCredentials() error {
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return errTransportCredsAndBundle
}
if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
return errNoTransportCredsInBundle
}
transportCreds := cc.dopts.copts.TransportCredentials
if transportCreds == nil {
transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
}
if transportCreds.Info().SecurityProtocol == "insecure" {
for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return errTransportCredentialsMissing
}
}
}
return nil
}
解析 ServiceConfig
if cc.dopts.defaultServiceConfigRawJSON != nil {
scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
if scpr.Err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
}
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
LoadBalancingPolicy *string
LoadBalancingConfig *json.RawMessage
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}
自定义 Dialer
在向 grpc 服务器端发起连接请求时会调用这个函数,若没有指定则使用默认的 dialer
func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.Dialer = f
})
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
address := addr.Addr
networkType, ok := networktype.Get(addr)
if fn != nil {
// Special handling for unix scheme with custom dialer. Back in the day,
// we did not have a unix resolver and therefore targets with a unix
// scheme would end up using the passthrough resolver. So, user's used a
// custom dialer in this case and expected the original dial target to
// be passed to the custom dialer. Now, we have a unix resolver. But if
// a custom dialer is specified, we want to retain the old behavior in
// terms of the address being passed to the custom dialer.
if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
// Supported unix targets are either "unix://absolute-path" or
// "unix:relative-path".
if filepath.IsAbs(address) {
return fn(ctx, "unix://"+address)
}
return fn(ctx, "unix:"+address)
}
return fn(ctx, address)
}
if !ok {
networkType, address = parseDialTarget(address)
}
if networkType == "tcp" && useProxy {
return proxyDial(ctx, address, grpcUA)
}
return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)
}
客户端发起调用
发起调用
发起 rpc 调用
func main() {
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
}
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
err := c.cc.Invoke(ctx, Greeter_SayHello_FullMethodName, in, out, cOpts...)
}
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return nil, err
}
...
}
创建 resolver
Resolver 的作用是监视 target 的变化,在发生变化时更新 resolver.State(包含 address 和 service config)。 在上面一连串的调用后,会触发 OnCallBegin,其中调用了 resolverBuilder.Build,初始化了 resolver。在 resolver 初始化时会调用 clientConn 的 UpdateState 方法,在 UpdateState 中会调用 updateResolverStateAndUnlock,引发了 balancer 的初始化动作。
func (ccr *ccResolverWrapper) start() error {
errCh := make(chan error)
ccr.serializer.TrySchedule(func(ctx context.Context) {
ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
})
return <-errCh
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
ccr.cc.mu.Lock()
ccr.mu.Lock()
if ccr.closed {
ccr.mu.Unlock()
ccr.cc.mu.Unlock()
return nil
}
if s.Endpoints == nil {
s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
for _, a := range s.Addresses {
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
ep.Addresses[0].BalancerAttributes = nil
s.Endpoints = append(s.Endpoints, ep)
}
}
ccr.addChannelzTraceEvent(s)
ccr.curState = s
ccr.mu.Unlock()
return ccr.cc.updateResolverStateAndUnlock(s, nil)
}
func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
defer cc.firstResolveEvent.Fire()
// Check if the ClientConn is already closed. Some fields (e.g.
// balancerWrapper) are set to nil when closing the ClientConn, and could
// cause nil pointer panic if we don't have this check.
if cc.conns == nil {
cc.mu.Unlock()
return nil
}
if err != nil {
// May need to apply the initial service config in case the resolver
// doesn't support service configs, or doesn't provide a service config
// with the new addresses.
cc.maybeApplyDefaultServiceConfig()
cc.balancerWrapper.resolverError(err)
// No addresses are valid with err set; return early.
cc.mu.Unlock()
return balancer.ErrBadResolverState
}
var ret error
// 根据初始条件下是否配置了serviceConfig,会执行不同的分支
// 如果用户没有设置 ServiceConfig 或者显示的设置了 grpc.WithDisableServiceConfig()的话,就会执行 maybeApplyDefaultServiceConfig 方法,其中会调用 applyServiceConfigAndBalancer
// 如果用户设置了 ServiceConfig 的话,就会执行 applyServiceConfigAndBalancer
if cc.dopts.disableServiceConfig {
channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
cc.maybeApplyDefaultServiceConfig()
} else if s.ServiceConfig == nil {
cc.maybeApplyDefaultServiceConfig()
// TODO: do we need to apply a failing LB policy if there is no
// default, per the error handling design?
} else {
if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
configSelector := iresolver.GetConfigSelector(s)
if configSelector != nil {
if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
}
} else {
configSelector = &defaultConfigSelector{sc}
}
cc.applyServiceConfigAndBalancer(sc, configSelector)
} else {
ret = balancer.ErrBadResolverState
if cc.sc == nil {
// Apply the failing LB only if we haven't received valid service config
// from the name resolver in the past.
cc.applyFailingLBLocked(s.ServiceConfig)
cc.mu.Unlock()
return ret
}
}
}
var balCfg serviceconfig.LoadBalancingConfig
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig
}
bw := cc.balancerWrapper
cc.mu.Unlock()
// 更新了 balancer 的配置
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
// currently meaningless to the caller.
}
return ret
}
初始化 service config,若未指定则使用默认的 emptyServiceConfig,而 emptyServiceConfig 在初始化时使用的就是 pick first 的 balancer 实现。
func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
if cc.sc != nil {
cc.applyServiceConfigAndBalancer(cc.sc, nil)
return
}
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
} else {
cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
}
}
func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult {
...
if c == nil {
name := pickfirst.Name
}
}
创建 balancer
Balancer 的作用是从 gRPC 接收输入,管理 SubConns,并收集和聚合连接状态。它还生成并更新 gRPC 使用的 Picker,以选择用于 RPC 的 SubConns。
初始化的时候创建了 ccBalancerWrapper,底层的 balancer 并未创建。 底层的 balancer 会在调用 updateClientConnState 创建。
// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the updateClientConnState() method
// is invoked.
func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
ctx, cancel := context.WithCancel(cc.ctx)
ccb := &ccBalancerWrapper{
cc: cc,
opts: balancer.BuildOptions{
DialCreds: cc.dopts.copts.TransportCredentials,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParent: cc.channelz,
Target: cc.parsedTarget,
MetricsRecorder: cc.metricsRecorderList,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
}
ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
return ccb
}
配置变化时更新 balancer
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
errCh := make(chan error)
uccs := func(ctx context.Context) {
defer close(errCh)
if ctx.Err() != nil || ccb.balancer == nil {
return
}
// 获取 balancer name
name := gracefulswitch.ChildName(ccs.BalancerConfig)
if ccb.curBalancerName != name {
ccb.curBalancerName = name
channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
}
err := ccb.balancer.UpdateClientConnState(*ccs)
if logger.V(2) && err != nil {
logger.Infof("error from balancer.UpdateClientConnState: %v", err)
}
errCh <- err
}
onFailure := func() { close(errCh) }
// UpdateClientConnState can race with Close, and when the latter wins, the
// serializer is closed, and the attempt to schedule the callback will fail.
// It is acceptable to ignore this failure. But since we want to handle the
// state update in a blocking fashion (when we successfully schedule the
// callback), we have to use the ScheduleOr method and not the MaybeSchedule
// method on the serializer.
ccb.serializer.ScheduleOr(uccs, onFailure)
return <-errCh
}
发起连接
addrConn 发起 connect,和服务器建立 tcp 连接
func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
}
// connect starts creating a transport.
func (ac *addrConn) connect() error {
ac.mu.Lock()
// 连接状态检验
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
if ac.state != connectivity.Idle {
ac.mu.Unlock()
return nil
}
ac.resetTransportAndUnlock()
return nil
}
// resetTransportAndUnlock unconditionally connects the addrConn.
func (ac *addrConn) resetTransportAndUnlock() {
...
if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
return
}
...
}
// tryAllAddrs tries to create a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
for _, addr := range addrs {
err := ac.createTransport(ctx, addr, copts, connectDeadline)
if err == nil {
return nil
}
}
// Couldn't connect to any address.
return firstConnErr
}
进入连接
拨号阶段,使用 net 包的 DialContext 进行 tcp 连接。连接成功后返回 net.Conn 用于
// createTransport creates a connection to addr. It returns an error if the
// address was not successfully connected, or updates ac appropriately with the
// new transport.
func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
...
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
...
return nil
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
...
return t, nil
}
帧设置
创建 http2Client,用于发送帧,接收帧
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
...
// 创建 http2Client 对象
t := &http2Client{}
...
// 接收服务器发送的消息
// Start the reader goroutine for incoming messages. Each transport has a
// dedicated goroutine which reads HTTP2 frames from the network. Then it
// dispatches the frame to the corresponding stream entity. When the
// server preface is received, readerErrCh is closed. If an error occurs
// first, an error is pushed to the channel. This must be checked before
// returning from this function.
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
// 发送 ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
return nil, err
}
if n != len(clientPreface) {
err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
return nil, err
}
var ss []http2.Setting
if t.initialWindowSize != defaultWindowSize {
ss = append(ss, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(t.initialWindowSize),
})
}
if opts.MaxHeaderListSize != nil {
ss = append(ss, http2.Setting{
ID: http2.SettingMaxHeaderListSize,
Val: *opts.MaxHeaderListSize,
})
}
// 发送客户端的设置信息
err = t.framer.fr.WriteSettings(ss...)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
return nil, err
}
// 发送窗口更新帧
// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
return nil, err
}
}
t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
// Block until the server preface is received successfully or an error occurs.
if err = <-readerErrCh; err != nil {
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
// server sent a GOAWAY). For I/O errors, the reader will hit it
// after draining any remaining incoming data.
t.conn.Close()
}
close(t.writerDone)
}()
return t, nil
}
延伸阅读
- https://github.com/grpc/grpc-go/blob/master/Documentation/anti-patterns.md
- https://github.com/grpc/grpc/blob/master/doc/naming.md
- https://github.com/grpc/grpc/blob/master/doc/service_config.md
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。