Go猜想录
大道至简,悟者天成
gRPC-Go -- #10 流级别流控之窗口更新

本文将介绍 grpc-go 中是如何发送流级别的窗口更新帧来补充相应的 quota,允许发送方继续在流上发送数据的。

inFlow

由于窗口更新帧也会占用网络资源,过于频繁的发送会影响传输效率。inFlow 是用于控制流级别窗口更新帧发送时机的数据结构。初始化时,inFlowlimit 字段默认为 t.initialWindowSize,后续可以通过 BDP 算法更新 limit。如果用户自定义了 opts.InitialWindowSize,则会使用该值覆盖默认的 initialWindowSize,同时也会关闭 BDP 动态更新窗口算法。

inFlow 内部会保存如下字段:

  • limit: 入站流控限制,用于控制未处理数据的大小。
  • pendingData: 已经接收到但尚未被应用程序消费的数据量。
  • pendingUpdate: 应用程序已消费但 grpc 尚未发送窗口更新的数据量。
  • delta: 当应用程序读取的数据大小超过 limit 限制时,接收方额外给予的窗口更新。
type inFlow struct {
	mu sync.Mutex
	// The inbound flow control limit for pending data.
	limit uint32
	// pendingData is the overall data which have been received but not been
	// consumed by applications.
	pendingData uint32
	// The amount of data the application has consumed but grpc has not sent
	// window update for them. Used to reduce window update frequency.
	pendingUpdate uint32
	// delta is the extra window update given by receiver when an application
	// is reading data bigger in size than the inFlow limit.
	delta uint32
}

const (
	defaultWindowSize = 65535
	initialWindowSize = defaultWindowSize // for an RPC
)

支持的方法如下所示:

maybeAdjust

maybeAdjust 被调用时会返回一个给对端发送的窗口更新帧的大小。
内部逻辑如下,先计算出如下变量:

  • estSenderQuota: limit - (pendingData + pendingUpdate) 表示在不发出窗口更新帧时发送方能发出的最大数据量
  • estUntransmittedData: n - f.pendingData 表示发送方可能还需发送的最大数据量 当 estSenderQuota < estUntransmittedData,即 quota 值不足时,会发送 n 大小的窗口更新帧,以提高流发送数据的能力,防止在一次大量传输数据帧的传输中因 quota 值不足而无法传输。为整个消息发送窗口更新,而不仅仅是 estUntransmittedDataestSenderQuota 之间的差值,这在消息有填充时会很有用。
func (f *inFlow) maybeAdjust(n uint32) uint32 {
	...
	estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
	estUntransmittedData := int32(n - f.pendingData)
	if estUntransmittedData > estSenderQuota {
		if f.limit+n > maxWindowSize {
			f.delta = maxWindowSize - f.limit
		} else {
			f.delta = n
		}
		return f.delta
	}
	return 0
}

onData

onData 在收到数据帧时被调用,用来更新 pendingData

func (f *inFlow) onData(n uint32) error {
	f.mu.Lock()
	f.pendingData += n
	if f.pendingData+f.pendingUpdate > f.limit+f.delta {
		limit := f.limit
		rcvd := f.pendingData + f.pendingUpdate
		f.mu.Unlock()
		return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
	}
	f.mu.Unlock()
	return nil
}

onRead

onRead 在应用程序读取数据时被调用,会返回一个给对端发送的窗口更新帧的大小。用来控制发送频率。

func (f *inFlow) onRead(n uint32) uint32 {
	f.mu.Lock()
	if f.pendingData == 0 {
		f.mu.Unlock()
		return 0
	}
	f.pendingData -= n
	if n > f.delta {
		n -= f.delta
		f.delta = 0
	} else {
		f.delta -= n
		n = 0
	}
	f.pendingUpdate += n
	if f.pendingUpdate >= f.limit/4 {
		wu := f.pendingUpdate
		f.pendingUpdate = 0
		f.mu.Unlock()
		return wu
	}
	f.mu.Unlock()
	return 0
}

newLimit

// newLimit updates the inflow window to a new value n.
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) {
	f.mu.Lock()
	f.limit = n
	f.mu.Unlock()
}

窗口更新时机

每个 Stream 都有个 fc 字段用来做流级别的流控。

// Stream represents an RPC in the transport layer.
type Stream struct {
	...
	fc           *inFlow
	...
}

接收数据

func (t *http2Server) handleData(f *http2.DataFrame) {
	...
	if size > 0 {
		if err := s.fc.onData(size); err != nil {
			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
			return
		}
		if f.Header().Flags.Has(http2.FlagDataPadded) {
			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
			}
		}
		...
	}
	...
}

读取数据前

adjustWindow 会在流的初始窗口大小基础上发送额外的窗口更新,当应用程序请求的数据量大于窗口大小时触发。追踪调用链路,最终调用到了 inFlowmaybeAdjust 方法。

func (t *http2Server) adjustWindow(s *Stream, n uint32) {
	if w := s.fc.maybeAdjust(n); w > 0 {
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
	}
}

...{
	s := &Stream{
		...
		fc:               &inFlow{limit: uint32(t.initialWindowSize)},
		...
	}
	s.requestRead = func(n int) {
		t.adjustWindow(s, uint32(n))
	}
}


func (p *parser) recvMsg(maxReceiveMessageSize int) (payloadFormat, mem.BufferSlice, error) {
	// length 是根据数据帧 header 计算出的数据长度
	data, err := p.r.Read(int(length))
	...
}


func (s *Stream) Read(n int) (data mem.BufferSlice, err error) {
	...
	s.requestRead(n)
	...
}

读取数据后

当读取数据后,满足相应条件发送窗口更新帧。

windowHandler: func(n int) {
	t.updateWindow(s, uint32(n))
},


func (t *http2Client) updateWindow(s *Stream, n uint32) {
	if w := s.fc.onRead(n); w > 0 {
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
	}
}


type transportReader struct {
	reader *recvBufferReader
	// The handler to control the window update procedure for both this
	// particular stream and the associated transport.
	windowHandler func(int)
	er            error
}

func (t *transportReader) ReadHeader(header []byte) (int, error) {
	n, err := t.reader.ReadHeader(header)
	if err != nil {
		t.er = err
		return 0, err
	}
	t.windowHandler(n)
	return n, nil
}

func (t *transportReader) Read(n int) (mem.Buffer, error) {
	buf, err := t.reader.Read(n)
	if err != nil {
		t.er = err
		return buf, err
	}
	t.windowHandler(buf.Len())
	return buf, nil
}

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。