gRPC-Go -- #5 流级别流控 writeQuota
原理
writeQuota
是在 grpc-go
中实现流量控制的其中一种数据结构,用于本地的流级别的流量控制,不涉及到网络传输。本篇文章将它抽离出来,分析下它能做到哪些功能和这些功能是如何实现的。
// writeQuota is a soft limit on the amount of data a stream can
// schedule before some of it is written out.
type writeQuota struct {
quota int32
// get waits on read from when quota goes less than or equal to zero.
// replenish writes on it when quota goes positive again.
ch chan struct{}
// done is triggered in error case.
done <-chan struct{}
// replenish is called by loopyWriter to give quota back to.
// It is implemented as a field so that it can be updated
// by tests.
replenish func(n int)
}
// newWriteQuota 用于初始化一个 writeQuota
// sz: 当前可以获取到的 quota 总量
// done: 用于关闭 writeQuota,在外部通过 done 控制 writeQuota 的生命周期
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
w := &writeQuota{
quota: sz,
ch: make(chan struct{}, 1),
done: done,
}
w.replenish = w.realReplenish
return w
}
// get 用于阻塞式获取 sz 数量的 quota
func (w *writeQuota) get(sz int32) error {
for {
// 只要 quota > 0,就允许获取,即使获取后 quota < 0。
if atomic.LoadInt32(&w.quota) > 0 {
atomic.AddInt32(&w.quota, -sz)
return nil
}
select {
case <-w.ch:
continue
case <-w.done:
return errStreamDone
}
}
}
// realReplenish 用于补充 n 数量的 quota
// 如果补充数量充足,可以解除因数量不足阻塞在 get 处的调用方
func (w *writeQuota) realReplenish(n int) {
sz := int32(n)
a := atomic.AddInt32(&w.quota, sz)
b := a - sz
if b <= 0 && a > 0 {
select {
case w.ch <- struct{}{}:
default:
}
}
}
使用
writeQuota
是作用于流级别的,是 Stream
的一个字段,生命周期与 Stream
一致。初始化时,writeQuota
默认分配 64 KB。当 Stream
需要向对端发送数据帧时,首先会将 gRPC 级别的数据帧放入内存中的消息队列 controlBuf
,然后由帧发送器 loopyWriter
取出并转换为 HTTP/2 数据帧进行发送。
为了防止 controlBuf
中无限制堆积数据帧,writeQuota
的作用开始显现。在放入数据之前,必须先检查是否有足够的 quota
(数据大小的配额)。只有成功获取到足够的 quota
,才允许将数据帧放入 controlBuf
。当 loopyWriter
成功将数据帧发送后,会相应地补充 writeQuota
的配额,实现流量控制。
因为 writeQuota
默认只有 64 KB 大小的 quota
,当发送 2 MB 大小的数据会有问题吗?答案是不会有问题,因为 writeQuota
的实现允许 quota
获取后为负值,所以即使数据大小大于 64 KB 也没有问题,仍然允许放入第一个数据帧的,不过后续的数据帧就只能等待了。
当然 loopyWriter
也不是无限制的向对端发送数据的,具体它是如何控制流量的呢,请听下回分解。
// Stream represents an RPC in the transport layer.
type Stream struct {
...
wq *writeQuota
...
}
// defaultWriteQuota is the default value for number of data
// bytes that each stream can schedule before some of it being
// flushed out.
var defaultWriteQuota = 64 * 1024
// 初始化
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
// 放数据之前先从 wq 获取数据大小的 quota 获取成功才允许向 controlBuf 放入数据
func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
...
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
_ = reader.Close()
return err
}
}
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
return err
}
return nil
}
// 当 loopyWriter 成功将数据帧发送出去后,就又会向 writeQuota 补充 quota。
func (l *loopyWriter) processData() (bool, error) {
...
str.wq.replenish(size)
...
}
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。