Go猜想录
大道至简,悟者天成
gRPC-Go -- #4 负载均衡 balancer

balancer 的作用

balancer 可以认为是进程内的负载均衡器,作用于客户端一侧。

balancer 相关接口

Builder

工厂模式:用于构造 balancer,一个 ClientConn 对应于一个 Balancer。

type Builder interface {
	Build(cc ClientConn, opts BuildOptions) Balancer
	Name() string
}

type ClientConn interface {
	UpdateState(State)
	ResolveNow(resolver.ResolveNowOptions)
	
	// Deprecated
	NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
	RemoveSubConn(SubConn)
	UpdateAddresses(SubConn, []resolver.Address)
	Target() string
}

type State struct {
	ConnectivityState connectivity.State
	Picker Picker
}

const (
	Idle State = iota
	Connecting
	Ready
	TransientFailure
	Shutdown
)

type Picker interface {
	Pick(info PickInfo) (PickResult, error)
}

type PickInfo struct {
	FullMethodName string
	Ctx context.Context
}

type PickResult struct {
	SubConn SubConn
	Done func(DoneInfo)
	Metadata metadata.MD
}

type SubConn interface {
	Connect()
	GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
	Shutdown()
	
	// Deprecated
	UpdateAddresses([]resolver.Address)
}

Balancer

维护所有的 SubConn,每一个 SubConn 对应于一个 Address。 在 balancer 调用 UpdateClientConnState 更新状态时,最终内部调用的其实是 ClientConn 的 UpdateState 方法,更新了 ConnectivityState 和 Picker。Picker 的作用是挑选出一个 SubConn 用来建立 tcp 连接。

type Balancer interface {
	UpdateClientConnState(ClientConnState) error
	ResolverError(error)
	Close()
 	
	// Deprecated
	UpdateSubConnState(SubConn, SubConnState)
}

type ClientConnState struct {
	ResolverState resolver.State
	BalancerConfig serviceconfig.LoadBalancingConfig
}

type State struct {
	Addresses []Address
	Endpoints []Endpoint
	ServiceConfig *serviceconfig.ParseResult
	Attributes *attributes.Attributes
}

自定义 balancer

事实上 builder 可以直接使用 base.NewBalancerBuilder 进行创建,我们只需要提供一个 PickerBuilder 的实现即可。PickerBuilder 可以根据场景选择最合适的算法,比如轮询,加权轮询等等。实现好后要注册进 grpc 框架中 balancer.Register。客户端初始化时默认使用的是 pick_first balancer。如果希望使用指定的 balancer,需要传入 service config 配置项。如下所示,是指定使用 round_robin balancer。如果想用我们自己实现的 balancer,则把里面的 round_robin 策略用我们初始化 Builder 时用的 name 做替换即可。

// This sets the initial balancing policy.
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), 
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
	return &baseBuilder{
		name:          name,
		pickerBuilder: pb,
		config:        config,
	}
}

type PickerBuilder interface {
	// Build returns a picker that will be used by gRPC to pick a SubConn.
	Build(info PickerBuildInfo) balancer.Picker
}

func Register(b Builder) {
	name := strings.ToLower(b.Name())
	if name != b.Name() {
		// TODO: Skip the use of strings.ToLower() to index the map after v1.59
		// is released to switch to case sensitive balancer registry. Also,
		// remove this warning and update the docstrings for Register and Get.
		logger.Warningf("Balancer registered with name %q. grpc-go will be switching to case sensitive balancer registries soon", b.Name())
	}
	m[name] = b
}

grpc 框架内部的使用

下面是 balancer 初始化时的代码片段

func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
	...
			balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
	...
}
func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
	...
	bw := &balancerWrapper{
		builder: builder,
		gsb:     gsb,
		lastState: balancer.State{
			ConnectivityState: connectivity.Connecting,
			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
		},
		subconns: make(map[balancer.SubConn]bool),
	}
	...
	newBalancer := builder.Build(bw, gsb.bOpts)
	...
	bw.Balancer = newBalancer
	return bw, nil
}

下面是更新 picker 的代码片段,picker 中是保存着 SubConn 的,后续取出 SubConn 就可以直接建立连接。

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
	...
	ccb.cc.pickerWrapper.updatePicker(s.Picker)
	ccb.cc.csMgr.updateState(s.ConnectivityState)
}

如何创建的 SubConn

虽然是接口类型,但全局只有一个 ccBalancerWrapper 的实现。主要逻辑是一层层的包装类,将 addrs 保存起来。

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
	...
	ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
	...
	acbw := &acBalancerWrapper{
		ccb:           ccb,
		ac:            ac,
		producers:     make(map[balancer.ProducerBuilder]*refCountedProducer),
		stateListener: opts.StateListener,
	}
	ac.acbw = acbw
	return acbw, nil
}

func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
	...
	ac := &addrConn{
		state:          connectivity.Idle,
		cc:             cc,
		addrs:          copyAddresses(addrs),
		scopts:         opts,
		dopts:          cc.dopts,
		channelz:       channelz.RegisterSubChannel(cc.channelz, ""),
		resetBackoff:   make(chan struct{}),
		stateReadyChan: make(chan struct{}),
	}
	...
	cc.conns[ac] = struct{}{}
	return ac, nil
}

pick_first balancer

type pickfirstBalancer struct {
	logger  *internalgrpclog.PrefixLogger
	// balancer 的状态,如Idle、Connecting、Ready、TransientFailure、Shutdown
	state   connectivity.State
	cc      balancer.ClientConn
	subConn balancer.SubConn
}

picker 没有什么特殊的逻辑,因为 addrConn 默认的实现就是按顺序连接,第一个成功后就退出。

func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	return p.result, p.err
}
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
	...
	for _, addr := range addrs {
		...
		err := ac.createTransport(ctx, addr, copts, connectDeadline)
		if err == nil {
			return nil
		}
		...
	}
	...
}

round_robin balancer

逻辑也很简单,就是轮询即可

type rrPicker struct {
	// subConns is the snapshot of the roundrobin balancer when this picker was
	// created. The slice is immutable. Each Get() will do a round robin
	// selection from it and return the selected SubConn.
	subConns []balancer.SubConn
	next     uint32
}

func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	subConnsLen := uint32(len(p.subConns))
	nextIndex := atomic.AddUint32(&p.next, 1)

	sc := p.subConns[nextIndex%subConnsLen]
	return balancer.PickResult{SubConn: sc}, nil
}

延伸阅读


知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。