balancer 的作用
balancer 可以认为是进程内的负载均衡器,作用于客户端一侧。
balancer 相关接口
Builder
工厂模式:用于构造 balancer,一个 ClientConn 对应于一个 Balancer。
type Builder interface {
Build(cc ClientConn, opts BuildOptions) Balancer
Name() string
}
type ClientConn interface {
UpdateState(State)
ResolveNow(resolver.ResolveNowOptions)
// Deprecated
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
RemoveSubConn(SubConn)
UpdateAddresses(SubConn, []resolver.Address)
Target() string
}
type State struct {
ConnectivityState connectivity.State
Picker Picker
}
const (
Idle State = iota
Connecting
Ready
TransientFailure
Shutdown
)
type Picker interface {
Pick(info PickInfo) (PickResult, error)
}
type PickInfo struct {
FullMethodName string
Ctx context.Context
}
type PickResult struct {
SubConn SubConn
Done func(DoneInfo)
Metadata metadata.MD
}
type SubConn interface {
Connect()
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
Shutdown()
// Deprecated
UpdateAddresses([]resolver.Address)
}
Balancer
维护所有的 SubConn,每一个 SubConn 对应于一个 Address。 在 balancer 调用 UpdateClientConnState 更新状态时,最终内部调用的其实是 ClientConn 的 UpdateState 方法,更新了 ConnectivityState 和 Picker。Picker 的作用是挑选出一个 SubConn 用来建立 tcp 连接。
type Balancer interface {
UpdateClientConnState(ClientConnState) error
ResolverError(error)
Close()
// Deprecated
UpdateSubConnState(SubConn, SubConnState)
}
type ClientConnState struct {
ResolverState resolver.State
BalancerConfig serviceconfig.LoadBalancingConfig
}
type State struct {
Addresses []Address
Endpoints []Endpoint
ServiceConfig *serviceconfig.ParseResult
Attributes *attributes.Attributes
}
自定义 balancer
事实上 builder 可以直接使用 base.NewBalancerBuilder
进行创建,我们只需要提供一个 PickerBuilder 的实现即可。PickerBuilder 可以根据场景选择最合适的算法,比如轮询,加权轮询等等。实现好后要注册进 grpc 框架中 balancer.Register
。客户端初始化时默认使用的是 pick_first balancer。如果希望使用指定的 balancer,需要传入 service config 配置项。如下所示,是指定使用 round_robin balancer。如果想用我们自己实现的 balancer,则把里面的 round_robin
策略用我们初始化 Builder 时用的 name 做替换即可。
// This sets the initial balancing policy.
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
config: config,
}
}
type PickerBuilder interface {
// Build returns a picker that will be used by gRPC to pick a SubConn.
Build(info PickerBuildInfo) balancer.Picker
}
func Register(b Builder) {
name := strings.ToLower(b.Name())
if name != b.Name() {
// TODO: Skip the use of strings.ToLower() to index the map after v1.59
// is released to switch to case sensitive balancer registry. Also,
// remove this warning and update the docstrings for Register and Get.
logger.Warningf("Balancer registered with name %q. grpc-go will be switching to case sensitive balancer registries soon", b.Name())
}
m[name] = b
}
grpc 框架内部的使用
下面是 balancer 初始化时的代码片段
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
...
balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
...
}
func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
...
bw := &balancerWrapper{
builder: builder,
gsb: gsb,
lastState: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
subconns: make(map[balancer.SubConn]bool),
}
...
newBalancer := builder.Build(bw, gsb.bOpts)
...
bw.Balancer = newBalancer
return bw, nil
}
下面是更新 picker 的代码片段,picker 中是保存着 SubConn 的,后续取出 SubConn 就可以直接建立连接。
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
...
ccb.cc.pickerWrapper.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}
如何创建的 SubConn
虽然是接口类型,但全局只有一个 ccBalancerWrapper 的实现。主要逻辑是一层层的包装类,将 addrs 保存起来。
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
...
ac, err := ccb.cc.newAddrConnLocked(addrs, opts)
...
acbw := &acBalancerWrapper{
ccb: ccb,
ac: ac,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: opts.StateListener,
}
ac.acbw = acbw
return acbw, nil
}
func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
...
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
stateReadyChan: make(chan struct{}),
}
...
cc.conns[ac] = struct{}{}
return ac, nil
}
pick_first balancer
type pickfirstBalancer struct {
logger *internalgrpclog.PrefixLogger
// balancer 的状态,如Idle、Connecting、Ready、TransientFailure、Shutdown
state connectivity.State
cc balancer.ClientConn
subConn balancer.SubConn
}
picker 没有什么特殊的逻辑,因为 addrConn 默认的实现就是按顺序连接,第一个成功后就退出。
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
...
for _, addr := range addrs {
...
err := ac.createTransport(ctx, addr, copts, connectDeadline)
if err == nil {
return nil
}
...
}
...
}
round_robin balancer
逻辑也很简单,就是轮询即可
type rrPicker struct {
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
next uint32
}
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
subConnsLen := uint32(len(p.subConns))
nextIndex := atomic.AddUint32(&p.next, 1)
sc := p.subConns[nextIndex%subConnsLen]
return balancer.PickResult{SubConn: sc}, nil
}
延伸阅读
- https://pkg.go.dev/google.golang.org/grpc/balancer
- https://github.com/grpc/proposal/blob/master/A62-pick-first.md
- https://grpc.io/blog/grpc-load-balancing/
- https://github.com/grpc/grpc/blob/master/doc/service_config.md
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。