Go猜想录
大道至简,悟者天成
gRPC-Go -- #8 链路级别流控之窗口更新

当接收方处理收到的数据帧时,可能会发送链路级别的窗口更新帧,通知发送方可以继续发送数据。本文将详细介绍窗口更新帧的触发条件,以及何时满足发送窗口更新帧的要求。

trInFlow

由于窗口更新帧也会占用网络资源,频繁发送会影响传输效率。trInFlow 是用于控制链路级别窗口更新帧发送时机的数据结构。初始化时,trInFlowlimit 字段默认为 initialWindowSize。如果用户自定义了 copts.InitialConnWindowSize,则会使用该值覆盖默认的 initialWindowSize,同时也会关闭 BDP 动态更新窗口算法。

type trInFlow struct {
	limit               uint32  // 当前的流量窗口大小(limit)
	unacked             uint32  // 未确认的字节数(未被ACK的字节数)
	effectiveWindowSize uint32  // 实际可用的窗口大小(limit - unacked)
}

const (
	defaultWindowSize = 65535
	initialWindowSize = defaultWindowSize // for an RPC
)

支持的方法如下所示:

// 更新窗口大小
// 会更新 limit 值,并返回和原来的差值。
func (f *trInFlow) newLimit(n uint32) uint32 {
	d := n - f.limit
	f.limit = n
	f.updateEffectiveWindowSize()
	return d
}

// 读取数据
func (f *trInFlow) onData(n uint32) uint32 {
	f.unacked += n
	if f.unacked >= f.limit/4 {
		return f.reset()
	}
	f.updateEffectiveWindowSize()
	return 0
}

// 重置
func (f *trInFlow) reset() uint32 {
	w := f.unacked
	f.unacked = 0
	f.updateEffectiveWindowSize()
	return w
}

// 更新实际可用的窗口大小
func (f *trInFlow) updateEffectiveWindowSize() {
	atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
}

func (f *trInFlow) getSize() uint32 {
	return atomic.LoadUint32(&f.effectiveWindowSize)
}

窗口更新时机

BDP 变化

当 BDP 发生变化时,会发送窗口大小更新帧,streamID 为 0 表示这是链路级别的发送窗口更新。注意 increment 可以是正数也可以是负数。

func (t *http2Server) updateFlowControl(n uint32) {
	...
	t.controlBuf.put(&outgoingWindowUpdate{
		streamID:  0,
		increment: t.fc.newLimit(n),
	})
	...
}

接收到一定量数据

每次收到数据帧时,trInFlowunacked 大小都会增加,直到其累积到一定阈值后 f.unacked >= f.limit/4 才触发窗口更新帧的发送,从而有效控制了发送频率。

// 接收数据帧时可能会发送窗口更新帧
if w := t.fc.onData(size); w > 0 {
	t.controlBuf.put(&outgoingWindowUpdate{
		streamID:  0,
		increment: w,
	})
}

发送 BDP Ping 帧前

为了避免在某些情况下(例如在第七层代理中)被误认为发送了过多的 ping 操作,会在发送 BDP Ping 之前,如果 unacked > 0 ,则先发送一个窗口更新帧。

// 发送 bdpPing 帧前如果 unacked > 0 会发送窗口更新帧
if sendBDPPing {
	// Avoid excessive ping detection (e.g. in an L7 proxy)
	// by sending a window update prior to the BDP ping.
	if w := t.fc.reset(); w > 0 {
		t.controlBuf.put(&outgoingWindowUpdate{
			streamID:  0,
			increment: w,
		})
	}
	t.controlBuf.put(bdpPing)
}

关于 grpc-go 在链路级别的流量控制就介绍完了。在 grpc-go 中,也对每个流实现了流量控制,具体是如何实现的呢,请听下回分解。


知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。