- CATALOG -
Go猜想录
大道至简,悟者天成
gRPC-Go -- #5 流级别流控 writeQuota

原理

writeQuota 是在 grpc-go 中实现流量控制的其中一种数据结构,用于本地的流级别的流量控制,不涉及到网络传输。本篇文章将它抽离出来,分析下它能做到哪些功能和这些功能是如何实现的。

// writeQuota is a soft limit on the amount of data a stream can
// schedule before some of it is written out.
type writeQuota struct {
	quota int32
	// get waits on read from when quota goes less than or equal to zero.
	// replenish writes on it when quota goes positive again.
	ch chan struct{}
	// done is triggered in error case.
	done <-chan struct{}
	// replenish is called by loopyWriter to give quota back to.
	// It is implemented as a field so that it can be updated
	// by tests.
	replenish func(n int)
}

// newWriteQuota 用于初始化一个 writeQuota
// sz: 当前可以获取到的 quota 总量
// done: 用于关闭 writeQuota,在外部通过 done 控制 writeQuota 的生命周期
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
	w := &writeQuota{
		quota: sz,
		ch:    make(chan struct{}, 1),
		done:  done,
	}
	w.replenish = w.realReplenish
	return w
}

// get 用于阻塞式获取 sz 数量的 quota
func (w *writeQuota) get(sz int32) error {
	for {
		// 只要 quota > 0,就允许获取,即使获取后 quota < 0。
		if atomic.LoadInt32(&w.quota) > 0 {
			atomic.AddInt32(&w.quota, -sz)
			return nil
		}
		select {
		case <-w.ch:
			continue
		case <-w.done:
			return errStreamDone
		}
	}
}

// realReplenish 用于补充 n 数量的 quota
// 如果补充数量充足,可以解除因数量不足阻塞在 get 处的调用方
func (w *writeQuota) realReplenish(n int) {
	sz := int32(n)
	a := atomic.AddInt32(&w.quota, sz)
	b := a - sz
	if b <= 0 && a > 0 {
		select {
		case w.ch <- struct{}{}:
		default:
		}
	}
}

使用

writeQuota 是作用于流级别的,是 Stream 的一个字段,生命周期与 Stream 一致。初始化时,writeQuota 默认分配 64 KB。当 Stream 需要向对端发送数据帧时,首先会将 gRPC 级别的数据帧放入内存中的消息队列 controlBuf,然后由帧发送器 loopyWriter 取出并转换为 HTTP/2 数据帧进行发送。

为了防止 controlBuf 中无限制堆积数据帧,writeQuota 的作用开始显现。在放入数据之前,必须先检查是否有足够的 quota(数据大小的配额)。只有成功获取到足够的 quota,才允许将数据帧放入 controlBuf。当 loopyWriter 成功将数据帧发送后,会相应地补充 writeQuota 的配额,实现流量控制。

因为 writeQuota 默认只有 64 KB 大小的 quota,当发送 2 MB 大小的数据会有问题吗?答案是不会有问题,因为 writeQuota 的实现允许 quota 获取后为负值,所以即使数据大小大于 64 KB 也没有问题,仍然允许放入第一个数据帧的,不过后续的数据帧就只能等待了。

当然 loopyWriter 也不是无限制的向对端发送数据的,具体它是如何控制流量的呢,请听下回分解。

// Stream represents an RPC in the transport layer.
type Stream struct {
	...
	wq           *writeQuota
	...
}


// defaultWriteQuota is the default value for number of data
// bytes that each stream can schedule before some of it being
// flushed out.
var defaultWriteQuota = 64 * 1024
// 初始化
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)


// 放数据之前先从 wq 获取数据大小的 quota 获取成功才允许向 controlBuf 放入数据
func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
	...
	if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
	 	if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
	 		_ = reader.Close()
	 		return err
	 	}
	}
	if err := t.controlBuf.put(df); err != nil {
 		_ = reader.Close()
 		return err
 	}
 	return nil
 }


// 当 loopyWriter 成功将数据帧发送出去后,就又会向 writeQuota 补充 quota。
func (l *loopyWriter) processData() (bool, error) {
	...
	str.wq.replenish(size)
	...
}

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。