gRPC-Go -- #10 流级别流控之窗口更新
本文将介绍 grpc-go 中是如何发送流级别的窗口更新帧来补充相应的 quota,允许发送方继续在流上发送数据的。
inFlow
由于窗口更新帧也会占用网络资源,过于频繁的发送会影响传输效率。inFlow
是用于控制流级别窗口更新帧发送时机的数据结构。初始化时,inFlow
的 limit
字段默认为 t.initialWindowSize
,后续可以通过 BDP 算法更新 limit
。如果用户自定义了 opts.InitialWindowSize
,则会使用该值覆盖默认的 initialWindowSize
,同时也会关闭 BDP 动态更新窗口算法。
inFlow
内部会保存如下字段:
- limit: 入站流控限制,用于控制未处理数据的大小。
- pendingData: 已经接收到但尚未被应用程序消费的数据量。
- pendingUpdate: 应用程序已消费但 grpc 尚未发送窗口更新的数据量。
- delta: 当应用程序读取的数据大小超过 limit 限制时,接收方额外给予的窗口更新。
type inFlow struct {
mu sync.Mutex
// The inbound flow control limit for pending data.
limit uint32
// pendingData is the overall data which have been received but not been
// consumed by applications.
pendingData uint32
// The amount of data the application has consumed but grpc has not sent
// window update for them. Used to reduce window update frequency.
pendingUpdate uint32
// delta is the extra window update given by receiver when an application
// is reading data bigger in size than the inFlow limit.
delta uint32
}
const (
defaultWindowSize = 65535
initialWindowSize = defaultWindowSize // for an RPC
)
支持的方法如下所示:
maybeAdjust
maybeAdjust
被调用时会返回一个给对端发送的窗口更新帧的大小。
内部逻辑如下,先计算出如下变量:
estSenderQuota: limit - (pendingData + pendingUpdate)
表示在不发出窗口更新帧时发送方能发出的最大数据量estUntransmittedData: n - f.pendingData
表示发送方可能还需发送的最大数据量 当estSenderQuota < estUntransmittedData
,即quota
值不足时,会发送 n 大小的窗口更新帧,以提高流发送数据的能力,防止在一次大量传输数据帧的传输中因 quota 值不足而无法传输。为整个消息发送窗口更新,而不仅仅是estUntransmittedData
和estSenderQuota
之间的差值,这在消息有填充时会很有用。
func (f *inFlow) maybeAdjust(n uint32) uint32 {
...
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
estUntransmittedData := int32(n - f.pendingData)
if estUntransmittedData > estSenderQuota {
if f.limit+n > maxWindowSize {
f.delta = maxWindowSize - f.limit
} else {
f.delta = n
}
return f.delta
}
return 0
}
onData
onData 在收到数据帧时被调用,用来更新 pendingData
func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
f.pendingData += n
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
limit := f.limit
rcvd := f.pendingData + f.pendingUpdate
f.mu.Unlock()
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
}
f.mu.Unlock()
return nil
}
onRead
onRead 在应用程序读取数据时被调用,会返回一个给对端发送的窗口更新帧的大小。用来控制发送频率。
func (f *inFlow) onRead(n uint32) uint32 {
f.mu.Lock()
if f.pendingData == 0 {
f.mu.Unlock()
return 0
}
f.pendingData -= n
if n > f.delta {
n -= f.delta
f.delta = 0
} else {
f.delta -= n
n = 0
}
f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 {
wu := f.pendingUpdate
f.pendingUpdate = 0
f.mu.Unlock()
return wu
}
f.mu.Unlock()
return 0
}
newLimit
// newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) {
f.mu.Lock()
f.limit = n
f.mu.Unlock()
}
窗口更新时机
每个 Stream 都有个 fc 字段用来做流级别的流控。
// Stream represents an RPC in the transport layer.
type Stream struct {
...
fc *inFlow
...
}
接收数据
func (t *http2Server) handleData(f *http2.DataFrame) {
...
if size > 0 {
if err := s.fc.onData(size); err != nil {
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
return
}
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
}
}
...
}
...
}
读取数据前
adjustWindow 会在流的初始窗口大小基础上发送额外的窗口更新,当应用程序请求的数据量大于窗口大小时触发。追踪调用链路,最终调用到了 inFlow
的 maybeAdjust
方法。
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
if w := s.fc.maybeAdjust(n); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
}
}
...{
s := &Stream{
...
fc: &inFlow{limit: uint32(t.initialWindowSize)},
...
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
}
func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSlice, error) {
// length 是根据数据帧 header 计算出的数据长度
data, err := p.r.Read(int(length))
...
}
func (s *Stream) Read(n int) (data mem.BufferSlice, err error) {
...
s.requestRead(n)
...
}
读取数据后
当读取数据后,满足相应条件发送窗口更新帧。
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
func (t *http2Client) updateWindow(s *Stream, n uint32) {
if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
}
}
type transportReader struct {
reader *recvBufferReader
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
windowHandler func(int)
er error
}
func (t *transportReader) ReadHeader(header []byte) (int, error) {
n, err := t.reader.ReadHeader(header)
if err != nil {
t.er = err
return 0, err
}
t.windowHandler(n)
return n, nil
}
func (t *transportReader) Read(n int) (mem.Buffer, error) {
buf, err := t.reader.Read(n)
if err != nil {
t.er = err
return buf, err
}
t.windowHandler(buf.Len())
return buf, nil
}
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。