Go猜想录
大道至简,悟者天成
gRPC-Go -- #2 连接建立流程

楔子

本节将介绍 grpc 客户端是如何跟 grpc 服务器端建立起连接的。

  • 服务端流程
    1. 初始化 grpc 服务器,注册提供的服务
    2. 启动 grpc 服务器,监听 tcp 请求
    3. 建立 tcp 连接
    4. 建立 http2 连接
    5. 消息交互
  • 客户端流程
    1. 初始化 grpc 客户端
    2. 发起 rpc 调用,resolver 解析服务器地址列表,balancer 选择目标地址
    3. 建立 tcp 连接
    4. 建立 http 2 连接

服务器端

main 代码

启动 gRPC 服务器。

// server is used to implement helloworld.GreeterServer.
type server struct {
	pb.UnimplementedGreeterServer
}

func main() {
	flag.Parse()
	// 创建一个监听,协议是tcp,端口号是port
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	// 创建 grpc 服务器
	s := grpc.NewServer()
	// 将实现的 service 注册到 grpc 服务器中
	pb.RegisterGreeterServer(s, &server{})
	log.Printf("server listening at %v", lis.Addr())
	// 启动 grpc 服务器
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

监听 tcp 请求

以阻塞方式监听客户端的 TCP 请求。

func (s *Server) Serve(lis net.Listener) error {
	// 阻塞式监听客户端的请求
	for {
		// 使用 net 包中的 Listener 来监听客户端的 tcp 请求
		rawConn, err := lis.Accept()
		...
		// 收到请求后会创建新的协程来处理此请求
		go func() {
			s.handleRawConn(lis.Addr().String(), rawConn)
		}()
	}
}

处理 tcp 请求

在接收到请求后,服务器与客户端建立 http2 连接。

func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
	// 检测 grpc 服务器是否关闭
	if s.quit.HasFired() {
		rawConn.Close()
		return
	}
	// 设置 tcp 链路的 deadline
	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))

	// 完成 http2 握手,创建 http2Server
	// 跟客户端交换帧的初始化信息,如帧的大小,窗口大小等
	// Finish handshaking (HTTP2)
	st := s.newHTTP2Transport(rawConn)
	rawConn.SetDeadline(time.Time{})
	if st == nil {
		return
	}

	...
	// 创建新的协程来处理流
	go func() {
		s.serveStreams(context.Background(), st, rawConn)
		s.removeConn(lisAddr, st)
	}()
}

http2 握手

创建 http2Server

在 gRPC 连接的 HTTP/2 握手阶段,客户端向服务器发送帧,服务器响应并发送其帧大小和初始化窗口大小等信息。同时,服务器接收客户端的帧信息并进行相应设置。通过这些交互,双方确定了各自的数据接收能力,然后进入流的处理阶段。

func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
	...
	st, err := transport.NewServerTransport(c, config)
	...
	return st
}

func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
	// 握手
	if config.Credentials != nil {
		conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
	}
	...
	// 创建 http2 的帧,内部调用了 http2.NewFramer
	framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
	// 初始化设置帧,类型是 http2.Setting 切片
	// 包括:帧的最大值,最大流数,初始化窗口的大小等
	// Send initial settings as connection preface to client.
	isettings := []http2.Setting{{
		ID:  http2.SettingMaxFrameSize,
		Val: http2MaxFrameLen,
	}}
	...
	// 发送 http2.Setting 配置给客户端,客户端会根据这些信息,更新本地的相应设置。
	if err := framer.fr.WriteSettings(isettings...); err != nil {
		return nil, connectionErrorf(false, err, "transport: %v", err)
	}
	// 发送窗口更新帧
	// Adjust the connection flow control window if needed.
	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
			return nil, connectionErrorf(false, err, "transport: %v", err)
		}
	}
	...
	t := &http2Server{}
	...

	// 读取客户端发送的 PRI,跟服务器端存储的 clientPreface 值进行比较,校验合法性
	// Check the validity of client preface.
	preface := make([]byte, len(clientPreface))
	if _, err := io.ReadFull(t.conn, preface); err != nil {
		...
	}
	if !bytes.Equal(preface, clientPreface) {
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
	}


 	// 读取客户端发送的帧,并转换为设置帧,然后更新本地配置
	frame, err := t.framer.fr.ReadFrame()
	...
	sf, ok := frame.(*http2.SettingsFrame)
	...
	t.handleSettings(sf)

	// 启动帧发送器
	go func() {
		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
		err := t.loopy.run()
		...
	}()
	// 启动 keepalive
	go t.keepalive()
	return t, nil
}

客户端初始化

main 代码

在 grpc-go v1.63 版本之后,新建客户端使用 grpc.NewClient, 而不是 grpc.Dialgrpc.DialContext,这两个方法标记为 Deprecated,不推荐使用。

NewClient 用于从客户端应用程序创建与 gRPC 服务器的虚拟连接。它接收一个目标 URI(表示逻辑后端服务的名称并解析为一个或多个物理地址)和一组选项,返回一个 ClientConn 对象,表示与服务器的虚拟连接。ClientConn 包含一个或多个实际的连接,并在连接中断时自动尝试重连。

NewClient 和之前 Dial 的区别:

  1. 不会执行 I/O 操作,即新建客户端时不会立刻开始连接,这样和其他语言的 gRPC 工作方式保持了一致。
  2. 使用 ClientConn 进行 RPC 调用将自动使其连接。
  3. 默认 resolver 使用 dns,之前的默认 resolver 使用的是 passthrough
func main() {
	// 传入 target URI 和 options 创建 ClientConn
	conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	...
	c := pb.NewGreeterClient(conn)
	...
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
}

新建 Client

建立 grpc 管道

func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
	// 创建 ClientConn 对象
	cc := &ClientConn{
		target: target,
		// 服务器地址 map,一个 target 可能由多个服务器提供服务
		conns:  make(map[*addrConn]struct{}),
		// 默认的连接配置,如 target 使用的 scheme、读缓存大小、写缓存大小、重连机制等等
		dopts:  defaultDialOptions(),
	}

	...

	// 设置默认的连接配置
	if !disableGlobalOpts {
		for _, opt := range globalDialOptions {
			opt.apply(&cc.dopts)
		}
	}

	// 设置用户的连接配置
	for _, opt := range opts {
		opt.apply(&cc.dopts)
	}

	// 设置 resolver
	// Determine the resolver to use.
	if err := cc.initParsedTargetAndResolverBuilder(); err != nil {
		return nil, err
	}

	...

	// 设置 interceptor
	// 包含如下用户传入的:
	// grpc.WithStreamInterceptor()
	// grpc.WithChainStreamInterceptor()
	// grpc.WithUnaryInterceptor()
	// grpc.WithChainUnaryInterceptor
	chainUnaryClientInterceptors(cc)
	chainStreamClientInterceptors(cc)

	// 校验 Credentials
	// 包含如下用户传入的:
	// grpc.WithInsecure()
	// grpc.WithPerRPCCredentials()
	// grpc.WithTransportCredentials()
	if err := cc.validateTransportCredentials(); err != nil {
		return nil, err
	}

	// 解析 ServiceConfig
	// 包含如下用户传入的:
	// grpc.WithDefaultServiceConfig()
	if cc.dopts.defaultServiceConfigRawJSON != nil {
		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
		if scpr.Err != nil {
			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
		}
		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
	}
	cc.mkp = cc.dopts.copts.KeepaliveParams

	if err = cc.initAuthority(); err != nil {
		return nil, err
	}

	// Register ClientConn with channelz. Note that this is only done after
	// channel creation cannot fail.
	cc.channelzRegistration(target)
	channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", cc.parsedTarget)
	channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)

	// 初始化csMgr,保存连接状态
	// 如 Idle、Connecting、Ready、TransientFailure、Shutdown
	cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
	cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)

	cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)

	// 初始化 wrappers、conns,保存状态是 idle
	cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
	cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)

	return cc, nil
}

设置自定义 options

go-idiom,给一个默认设置,再提供一些更改默认设置的方法参数。

for _, opt := range opts {
	opt.apply(&cc.dopts)
}

// DialOption configures how we set up the connection.
type DialOption interface {
	apply(*dialOptions)
}
// funcDialOption wraps a function that modifies dialOptions into an
// implementation of the DialOption interface.
type funcDialOption struct {
	f func(*dialOptions)
}

func (fdo *funcDialOption) apply(do *dialOptions) {
	fdo.f(do)
}

func newFuncDialOption(f func(*dialOptions)) *funcDialOption {
	return &funcDialOption{
		f: f,
	}
}

设置 interceptor

洋葱模型,将所有的 interceptor 组装到一起

// chainUnaryClientInterceptors chains all unary client interceptors into one.
func chainUnaryClientInterceptors(cc *ClientConn) {
	interceptors := cc.dopts.chainUnaryInts
	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
	// be executed before any other chained interceptors.
	if cc.dopts.unaryInt != nil {
		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
	}
	var chainedInt UnaryClientInterceptor
	if len(interceptors) == 0 {
		chainedInt = nil
	} else if len(interceptors) == 1 {
		chainedInt = interceptors[0]
	} else {
		chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
		}
	}
	cc.dopts.unaryInt = chainedInt
}

创建 resolver builder

parseTarget 的三个属性:

  • Scheme string: 如 dns,example,passthrough 等,如 dns 表示该解析器底层是通过调用dns服务来解析用户指定的地址的
  • Authority string:
  • Endpoint string: 如 localhost: 50051,或域名 foo.bar 等 grpc.WithResolvers 的优先级别高于 resolver.Register
func (cc *ClientConn) initParsedTargetAndResolverBuilder() error {
	logger.Infof("original dial target is: %q", cc.target)

	var rb resolver.Builder
	// 解析 cc.target 得到 ClientConn 中的属性 parseTarget
	// parseTarget 内部只有一个 url.URL
	parsedTarget, err := parseTarget(cc.target)
	if err == nil {
		// 初始化 resolver builder
		rb = cc.getResolver(parsedTarget.URL.Scheme)
		if rb != nil {
			cc.parsedTarget = parsedTarget
			cc.resolverBuilder = rb
			return nil
		}
	}

	// We are here because the user's dial target did not contain a scheme or
	// specified an unregistered scheme. We should fallback to the default
	// scheme, except when a custom dialer is specified in which case, we should
	// always use passthrough scheme. For either case, we need to respect any overridden
	// global defaults set by the user.
	defScheme := cc.dopts.defaultScheme
	if internal.UserSetDefaultScheme {
		defScheme = resolver.GetDefaultScheme()
	}

	canonicalTarget := defScheme + ":///" + cc.target

	parsedTarget, err = parseTarget(canonicalTarget)
	if err != nil {
		return err
	}
	rb = cc.getResolver(parsedTarget.URL.Scheme)
	if rb == nil {
		return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
	}
	cc.parsedTarget = parsedTarget
	cc.resolverBuilder = rb
	return nil
}

校验 credential

如果想设置此次的 tcp 连接为非安全连接的话,需要显示的设置 grpc.WithTransportCredentials(insecure.NewCredentials()),如果没有设置默认是安全的。

func (cc *ClientConn) validateTransportCredentials() error {
	if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
		return errNoTransportSecurity
	}
	if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
		return errTransportCredsAndBundle
	}
	if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
		return errNoTransportCredsInBundle
	}
	transportCreds := cc.dopts.copts.TransportCredentials
	if transportCreds == nil {
		transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
	}
	if transportCreds.Info().SecurityProtocol == "insecure" {
		for _, cd := range cc.dopts.copts.PerRPCCredentials {
			if cd.RequireTransportSecurity() {
				return errTransportCredentialsMissing
			}
		}
	}
	return nil
}

解析 ServiceConfig

if cc.dopts.defaultServiceConfigRawJSON != nil {
	scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON, cc.dopts.maxCallAttempts)
	if scpr.Err != nil {
		return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
	}
	cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
// TODO(lyuxuan): delete this struct after cleaning up old service config implementation.
type jsonSC struct {
	LoadBalancingPolicy *string
	LoadBalancingConfig *json.RawMessage
	MethodConfig        *[]jsonMC
	RetryThrottling     *retryThrottlingPolicy
	HealthCheckConfig   *healthCheckConfig
}

自定义 Dialer

在向 grpc 服务器端发起连接请求时会调用这个函数,若没有指定则使用默认的 dialer

func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
	return newFuncDialOption(func(o *dialOptions) {
		o.copts.Dialer = f
	})
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
	address := addr.Addr
	networkType, ok := networktype.Get(addr)
	if fn != nil {
		// Special handling for unix scheme with custom dialer. Back in the day,
		// we did not have a unix resolver and therefore targets with a unix
		// scheme would end up using the passthrough resolver. So, user's used a
		// custom dialer in this case and expected the original dial target to
		// be passed to the custom dialer. Now, we have a unix resolver. But if
		// a custom dialer is specified, we want to retain the old behavior in
		// terms of the address being passed to the custom dialer.
		if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
			// Supported unix targets are either "unix://absolute-path" or
			// "unix:relative-path".
			if filepath.IsAbs(address) {
				return fn(ctx, "unix://"+address)
			}
			return fn(ctx, "unix:"+address)
		}
		return fn(ctx, address)
	}
	if !ok {
		networkType, address = parseDialTarget(address)
	}
	if networkType == "tcp" && useProxy {
		return proxyDial(ctx, address, grpcUA)
	}
	return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)
}

客户端发起调用

发起调用

发起 rpc 调用

func main() {
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
}

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
	err := c.cc.Invoke(ctx, Greeter_SayHello_FullMethodName, in, out, cOpts...)
}

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply any, opts ...CallOption) error {
	return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	return cs.RecvMsg(reply)
}

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
	if err := cc.idlenessMgr.OnCallBegin(); err != nil {
		return nil, err
	}
	...
}


创建 resolver

Resolver 的作用是监视 target 的变化,在发生变化时更新 resolver.State(包含 address 和 service config)。 在上面一连串的调用后,会触发 OnCallBegin,其中调用了 resolverBuilder.Build,初始化了 resolver。在 resolver 初始化时会调用 clientConn 的 UpdateState 方法,在 UpdateState 中会调用 updateResolverStateAndUnlock,引发了 balancer 的初始化动作。

func (ccr *ccResolverWrapper) start() error {
	errCh := make(chan error)
	ccr.serializer.TrySchedule(func(ctx context.Context) {
		ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
	})
	return <-errCh
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
	ccr.cc.mu.Lock()
	ccr.mu.Lock()
	if ccr.closed {
		ccr.mu.Unlock()
		ccr.cc.mu.Unlock()
		return nil
	}
	if s.Endpoints == nil {
		s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
		for _, a := range s.Addresses {
			ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
			ep.Addresses[0].BalancerAttributes = nil
			s.Endpoints = append(s.Endpoints, ep)
		}
	}
	ccr.addChannelzTraceEvent(s)
	ccr.curState = s
	ccr.mu.Unlock()
	return ccr.cc.updateResolverStateAndUnlock(s, nil)
}
func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
	defer cc.firstResolveEvent.Fire()
	// Check if the ClientConn is already closed. Some fields (e.g.
	// balancerWrapper) are set to nil when closing the ClientConn, and could
	// cause nil pointer panic if we don't have this check.
	if cc.conns == nil {
		cc.mu.Unlock()
		return nil
	}

	if err != nil {
		// May need to apply the initial service config in case the resolver
		// doesn't support service configs, or doesn't provide a service config
		// with the new addresses.
		cc.maybeApplyDefaultServiceConfig()

		cc.balancerWrapper.resolverError(err)

		// No addresses are valid with err set; return early.
		cc.mu.Unlock()
		return balancer.ErrBadResolverState
	}

	var ret error
	// 根据初始条件下是否配置了serviceConfig,会执行不同的分支
	// 如果用户没有设置 ServiceConfig 或者显示的设置了 grpc.WithDisableServiceConfig()的话,就会执行 maybeApplyDefaultServiceConfig 方法,其中会调用 applyServiceConfigAndBalancer
	// 如果用户设置了 ServiceConfig 的话,就会执行 applyServiceConfigAndBalancer
	if cc.dopts.disableServiceConfig {
		channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
		cc.maybeApplyDefaultServiceConfig()
	} else if s.ServiceConfig == nil {
		cc.maybeApplyDefaultServiceConfig()
		// TODO: do we need to apply a failing LB policy if there is no
		// default, per the error handling design?
	} else {
		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
			configSelector := iresolver.GetConfigSelector(s)
			if configSelector != nil {
				if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
					channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
				}
			} else {
				configSelector = &defaultConfigSelector{sc}
			}
			cc.applyServiceConfigAndBalancer(sc, configSelector)
		} else {
			ret = balancer.ErrBadResolverState
			if cc.sc == nil {
				// Apply the failing LB only if we haven't received valid service config
				// from the name resolver in the past.
				cc.applyFailingLBLocked(s.ServiceConfig)
				cc.mu.Unlock()
				return ret
			}
		}
	}

	var balCfg serviceconfig.LoadBalancingConfig
	if cc.sc != nil && cc.sc.lbConfig != nil {
		balCfg = cc.sc.lbConfig
	}
	bw := cc.balancerWrapper
	cc.mu.Unlock()

	// 更新了 balancer 的配置
	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
	if ret == nil {
		ret = uccsErr // prefer ErrBadResolver state since any other error is
		// currently meaningless to the caller.
	}
	return ret
}

初始化 service config,若未指定则使用默认的 emptyServiceConfig,而 emptyServiceConfig 在初始化时使用的就是 pick first 的 balancer 实现。

func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
	if cc.sc != nil {
		cc.applyServiceConfigAndBalancer(cc.sc, nil)
		return
	}
	if cc.dopts.defaultServiceConfig != nil {
		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
	} else {
		cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
	}
}
func parseServiceConfig(js string, maxAttempts int) *serviceconfig.ParseResult {
	...
	if c == nil {
		name := pickfirst.Name
	}
}

创建 balancer

Balancer 的作用是从 gRPC 接收输入,管理 SubConns,并收集和聚合连接状态。它还生成并更新 gRPC 使用的 Picker,以选择用于 RPC 的 SubConns。

初始化的时候创建了 ccBalancerWrapper,底层的 balancer 并未创建。 底层的 balancer 会在调用 updateClientConnState 创建。

// newCCBalancerWrapper creates a new balancer wrapper in idle state. The
// underlying balancer is not created until the updateClientConnState() method
// is invoked.
func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
	ctx, cancel := context.WithCancel(cc.ctx)
	ccb := &ccBalancerWrapper{
		cc: cc,
		opts: balancer.BuildOptions{
			DialCreds:       cc.dopts.copts.TransportCredentials,
			CredsBundle:     cc.dopts.copts.CredsBundle,
			Dialer:          cc.dopts.copts.Dialer,
			Authority:       cc.authority,
			CustomUserAgent: cc.dopts.copts.UserAgent,
			ChannelzParent:  cc.channelz,
			Target:          cc.parsedTarget,
			MetricsRecorder: cc.metricsRecorderList,
		},
		serializer:       grpcsync.NewCallbackSerializer(ctx),
		serializerCancel: cancel,
	}
	ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts)
	return ccb
}

配置变化时更新 balancer

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
	errCh := make(chan error)
	uccs := func(ctx context.Context) {
		defer close(errCh)
		if ctx.Err() != nil || ccb.balancer == nil {
			return
		}
		// 获取 balancer name
		name := gracefulswitch.ChildName(ccs.BalancerConfig)
		if ccb.curBalancerName != name {
			ccb.curBalancerName = name
			channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name)
		}
		err := ccb.balancer.UpdateClientConnState(*ccs)
		if logger.V(2) && err != nil {
			logger.Infof("error from balancer.UpdateClientConnState: %v", err)
		}
		errCh <- err
	}
	onFailure := func() { close(errCh) }

	// UpdateClientConnState can race with Close, and when the latter wins, the
	// serializer is closed, and the attempt to schedule the callback will fail.
	// It is acceptable to ignore this failure. But since we want to handle the
	// state update in a blocking fashion (when we successfully schedule the
	// callback), we have to use the ScheduleOr method and not the MaybeSchedule
	// method on the serializer.
	ccb.serializer.ScheduleOr(uccs, onFailure)
	return <-errCh
}

发起连接

addrConn 发起 connect,和服务器建立 tcp 连接

func (acbw *acBalancerWrapper) Connect() {
	go acbw.ac.connect()
}
// connect starts creating a transport.
func (ac *addrConn) connect() error {
	ac.mu.Lock()
	// 连接状态检验
	if ac.state == connectivity.Shutdown {
		ac.mu.Unlock()
		return errConnClosing
	}
	if ac.state != connectivity.Idle {
		ac.mu.Unlock()
		return nil
	}

	ac.resetTransportAndUnlock()
	return nil
}
// resetTransportAndUnlock unconditionally connects the addrConn.
func (ac *addrConn) resetTransportAndUnlock() {
	...
	if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
		return
	}
	...
}
// tryAllAddrs tries to create a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
	for _, addr := range addrs {
		err := ac.createTransport(ctx, addr, copts, connectDeadline)
		if err == nil {
			return nil
		}
	}

	// Couldn't connect to any address.
	return firstConnErr
}

进入连接

拨号阶段,使用 net 包的 DialContext 进行 tcp 连接。连接成功后返回 net.Conn 用于

// createTransport creates a connection to addr. It returns an error if the
// address was not successfully connected, or updates ac appropriately with the
// new transport.
func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
	...
	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
	...
	return nil
}
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
	return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
	scheme := "http"
	ctx, cancel := context.WithCancel(ctx)
	conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
	...
	return t, nil
}

帧设置

创建 http2Client,用于发送帧,接收帧

// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
	...
	// 创建 http2Client 对象
	t := &http2Client{}
	...

	// 接收服务器发送的消息
	// Start the reader goroutine for incoming messages. Each transport has a
	// dedicated goroutine which reads HTTP2 frames from the network. Then it
	// dispatches the frame to the corresponding stream entity.  When the
	// server preface is received, readerErrCh is closed.  If an error occurs
	// first, an error is pushed to the channel.  This must be checked before
	// returning from this function.
	readerErrCh := make(chan error, 1)
	go t.reader(readerErrCh)
	

	// 发送 ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
	// Send connection preface to server.
	n, err := t.conn.Write(clientPreface)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
		return nil, err
	}
	if n != len(clientPreface) {
		err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
		return nil, err
	}
	var ss []http2.Setting

	if t.initialWindowSize != defaultWindowSize {
		ss = append(ss, http2.Setting{
			ID:  http2.SettingInitialWindowSize,
			Val: uint32(t.initialWindowSize),
		})
	}
	if opts.MaxHeaderListSize != nil {
		ss = append(ss, http2.Setting{
			ID:  http2.SettingMaxHeaderListSize,
			Val: *opts.MaxHeaderListSize,
		})
	}
	// 发送客户端的设置信息
	err = t.framer.fr.WriteSettings(ss...)
	if err != nil {
		err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
		return nil, err
	}
	// 发送窗口更新帧
	// Adjust the connection flow control window if needed.
	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
		if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
			err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
			return nil, err
		}
	}

	t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)

	if err := t.framer.writer.Flush(); err != nil {
		return nil, err
	}
	// Block until the server preface is received successfully or an error occurs.
	if err = <-readerErrCh; err != nil {
		return nil, err
	}
	go func() {
		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
		if err := t.loopy.run(); !isIOError(err) {
			// Immediately close the connection, as the loopy writer returns
			// when there are no more active streams and we were draining (the
			// server sent a GOAWAY).  For I/O errors, the reader will hit it
			// after draining any remaining incoming data.
			t.conn.Close()
		}
		close(t.writerDone)
	}()
	return t, nil
}

延伸阅读


知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。