Go猜想录
大道至简,悟者天成
gRPC-Go -- #3 服务发现 resolver

resolver 是什么

resolver 可以简单的看作是一个 map[service-name][]backend-ip 结构,它接收一个服务名,然后返回一组可以提供服务的后端 ip 地址。默认使用的 resolver 是 dns resolver。resolver 的主要作用是提供客户端的负载均衡能力。在 grpc-go 中, resolver 相关的内容都是面向接口编程的,所以我们可以轻易的实现自己的 resolver,注册进框架里使用。下面通过一个 example resolver 的例子来讲解如何实现一个 resolver。

resolver 初始化

  • grpc-go 框架会自动注册如下 resolver,可直接使用
    • pass-through resolver
    • unix resolver
    • dns resolver
  • 其它 resolver 需由使用者手动注册
    • xds resolver
    • manual resolver
    • example resolver
    // clientconn.go
	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
	_ "google.golang.org/grpc/internal/resolver/unix"        // To register unix resolver.
	_ "google.golang.org/grpc/resolver/dns"                  // To register dns resolver.

example resolver

grpc-go 提供的 resolver 实现示例

target

当新建一个 grpc 客户端的时候,会传入一个 target,target 的格式要符合 URI 规范,如下图所示。grpc 内部会保存一个 map[string]Builder 的结构,key 是 scheme,value 是 resolver builder。这个 key 正是和 URI 的 scheme 保持一致的,也就是说 grpc 是通过 URI 的 scheme 找到对应的 resolver builder,然后再进行后面的操作的。

// Scheme://Authority/Endpoint
type Target struct {
   Scheme    string  // 一种解析器对应一个 Scheme 值
   Authority string
   Endpoint  string
}

resolver builder

resolver builder 采用的是工厂模式,作用是用于构建 resolver,在初始化时就会生成 resolver builder。直到触发 rpc 调用时,才会生成 resolver,初始化 resolver 过程中就会解析出一组可以提供服务的后端 ip 地址,然后通过 ClientConn 更新到 grpc 内部。在后续过程中,如果有地址发生变化,会调用 ResolveNow 重新进行解析,也是通过 ClientConn 更新到 grpc 内部。resolver 也可以有主动监视地址变动然后自动更新的能力。

type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	Scheme() string
}

resolver

作用:

  1. 解析 target 获得一组可以提供服务的后端 ip 地址
    • 如何解析呢?
    • 直接写死:如 passthrough resolver、manual resolver
    • dns 解析:如 dns resolver
  2. 得到服务地址列表后,更新解析器的状态
    • 触发底层平衡器流程,和 grpc 服务器进行 tcp 连接
    • r.cc.UpdateState(resolver.State{Addresses: addrs})
type Resolver interface {
	ResolveNow(ResolveNowOptions)
	Close()
}

具体实现

实现一个 resolver 拢共分三步:

  1. 实现 builder 接口
    • Build
      • 构建 resolver 实例对象,初始化后端服务器的地址列表
      • 更新 resolver 状态,触发 balancer 运行
    • Scheme
      • 返回 resolver 的名称
  2. 实现 resolver 接口
    • ResolveNow
      • 再次解析 target
    • Close
      • 关闭 resolver
  3. 注册到 grpc 内部
    • resolver.Register(&exampleResolverBuilder{})
func init() {
	// 注册自定义 resolver
	resolver.Register(&exampleResolverBuilder{})
}

const (
	exampleScheme      = "example"
	exampleServiceName = "resolver.example.grpc.io"

	backendAddr = "localhost:50051"
)

type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
	r := &exampleResolver{
		target: target,
		cc:     cc,
		// 直接写死了服务地址
		addrsStore: map[string][]string{
			exampleServiceName: {backendAddr},
		},
	}
	// 第一次解析
	r.start()
	return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }

type exampleResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
}

func (r *exampleResolver) start() {
	addrStrs := r.addrsStore[r.target.Endpoint()]
	addrs := make([]resolver.Address, len(addrStrs))
	for i, s := range addrStrs {
		addrs[i] = resolver.Address{Addr: s}
	}
	// 更新 resolver state
	r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (*exampleResolver) Close()                                {}

pass-through resolver

  • pass-through resolver 在使用 DialContext 函数建立客户端时是默认的 resolver,不过这种创建 grpc 客户端的方式已经 deprecated,不推荐使用。最佳实践是使用 NewClient , NewClient 使用的 resolver 默认是 dns resolver。
  • pass-through resolver 使用传入的 target.Endpoint 作为 grpc 服务器端的地址,内部实际上没有任何解析的工作。
const scheme = "passthrough"

type passthroughBuilder struct{}

func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	if target.Endpoint() == "" && opts.Dialer == nil {
		return nil, errors.New("passthrough: received empty target in Build()")
	}
	// target 如 {“Scheme”:“passthrough”, “Endpoint”:“localhost:50051”}
	r := &passthroughResolver{
		target: target,
		cc:     cc,
	}
	// 初始化地址
	r.start()
	return r, nil
}

func (*passthroughBuilder) Scheme() string {
	return scheme
}

type passthroughResolver struct {
	target resolver.Target
	cc     resolver.ClientConn
}

func (r *passthroughResolver) start() {
	// 更新 ClientConn 的 resolver.State
	r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}})
}

func (*passthroughResolver) ResolveNow(resolver.ResolveNowOptions) {}

func (*passthroughResolver) Close() {}

func init() {
	// 注册
	resolver.Register(&passthroughBuilder{})
}

manual resolver

manual resolver 可以用于手动将解析的地址发送到 ClientConn。 初始化的时候需要传一个 scheme,因为没有固定 scheme,所以也没有默认被注册。使用的时候需要手动注册 grpc.WithResolvers(r),然后调用 InitialState 初始化地址信息。 因为使用 NewClient 生成的 grpc 客户端只有发起 rpc 调用时才会初始化 resolver,resolver 中的 ClientConn 才能被调用。所以 UpdateStateReportError 方法只能在 rpc 调用后才被调用。或者可以直接使用 Dial 方法就没有这个问题,因为 Dial 会在建立连接成功后才返回 grpc 客户端。

manual resolver 的存在我认为只是为了方便测试,自己实现的话没有必要依赖于它,或者说依赖于它提供的回调接口,完全可以实现个新的 resolver 类型。

r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{
	Addresses: []resolver.Address{
		{Addr: "address"},
	},
})
_, err := grpc.Dial("whatever://localhost",
	grpc.WithTransportCredentials(insecure.NewCredentials()),
	grpc.WithResolvers(r))
if err != nil {
	t.Errorf("dial setup error: %v", err)
}
r.UpdateState(resolver.State{Addresses: []resolver.Address{
	{Addr: "ok"},
}})
r.ReportError(errors.New("example"))

dns resolver

dns resolver 使用的是标准库 net 包中的 LookupHostLookupSRVLookupTXT 做 dns 解析工作的。解析的结果作为后端服务器地址列表。

type NetResolver interface {
	LookupHost(ctx context.Context, host string) (addrs []string, err error)
	LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
	LookupTXT(ctx context.Context, name string) (txts []string, err error)
}

Build 接口

func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	host, port, err := parseTarget(target.Endpoint(), defaultPort)
	if err != nil {
		return nil, err
	}

	// IP 地址不需要解析,实际上退化成了 pass-through resolver
	// IP address.
	if ipAddr, ok := formatIP(host); ok {
		addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
		cc.UpdateState(resolver.State{Addresses: addr})
		return deadResolver{}, nil
	}

	// DNS address (non-IP).
	ctx, cancel := context.WithCancel(context.Background())
	d := &dnsResolver{
		host:                 host,
		port:                 port,
		ctx:                  ctx,
		cancel:               cancel,
		cc:                   cc,
		rn:                   make(chan struct{}, 1),
		disableServiceConfig: opts.DisableServiceConfig,
	}

    // 初始化 netResolver
	d.resolver, err = internal.NewNetResolver(target.URL.Host)
	if err != nil {
		return nil, err
	}

	d.wg.Add(1)
	// 监听解析请求
	go d.watcher()
	return d, nil
}

监听解析请求,解析失败会自动重试。

func (d *dnsResolver) watcher() {
	defer d.wg.Done()
	backoffIndex := 1
	for {
		// dns 解析
		state, err := d.lookup()
		if err != nil {
			// Report error to the underlying grpc.ClientConn.
			d.cc.ReportError(err)
		} else {
			err = d.cc.UpdateState(*state)
		}

		var nextResolutionTime time.Time
		if err == nil {
			// Success resolving, wait for the next ResolveNow. However, also wait 30
			// seconds at the very least to prevent constantly re-resolving.
			// 重置 backoffIndex
			backoffIndex = 1
			nextResolutionTime = internal.TimeNowFunc().Add(MinResolutionInterval)
			// 解析成功后阻塞在这里,直到 resolver 被关闭或者收到解析请求。
			select {
			case <-d.ctx.Done():
				return
			case <-d.rn:
			}
		} else {
			// 解析失败设置下一次解析时间
			// Poll on an error found in DNS Resolver or an error received from
			// ClientConn.
			nextResolutionTime = internal.TimeNowFunc().Add(backoff.DefaultExponential.Backoff(backoffIndex))
			backoffIndex++
		}
		// 等待下一次解析
		select {
		case <-d.ctx.Done():
			return
		case <-internal.TimeAfterFunc(internal.TimeUntilFunc(nextResolutionTime)):
		}
	}
}

主动发起解析请求

func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
	select {
	case d.rn <- struct{}{}:
	default:
	}
}

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。