Go猜想录
大道至简,悟者天成
gRPC-Go -- #9 流级别流控 strQuota

HTTP/2 支持多路复用,这意味着同一条链路上可以同时传输多个流。在 grpc-go 中,也对每个流实现了流量控制。总体上,每个流会记录其已经发送的字节数,当发送的数据达到一定量时,会暂停发送,等待对方接收数据。对方接收完成后,会通知发送方可以继续传输。

接下来,从源码角度分析具体实现。每个 outStream 都有一个 bytesOutStanding 字段,用于记录已发送的字节数。通过链路的发送窗口大小 l.oiws 减去 str.bytesOutStanding 计算出该流的发送能力 strQuota。如果 strQuota 足以发送当前数据,则允许继续发送;否则,将该流状态改为 waitingOnStreamQuota,暂停发送。数据发送成功后,会将其字节数累加到 bytesOutStanding 中。

// outStream 代表一个向外发送数据的流
type outStream struct {
	...
	bytesOutStanding int
	...
}


// 发送数据
func (l *loopyWriter) processData() (bool, error) {
	...
	// 流允许发送的 quota
	strQuota := int(l.oiws) - str.bytesOutStanding
	if strQuota <= 0 { // stream-level flow control.
		str.state = waitingOnStreamQuota
		return false, nil
	}
	...
	// 数据发送成功
	str.bytesOutStanding += size
	...
}

当接收方成功接收到数据后,会发送一个流的窗口更新帧,帧中包含流的 ID 和需要更新的窗口大小。更新后,如果 strQuota > 0 且当前流的状态为 waitingOnStreamQuota,则会唤醒该流,重新允许其继续发送数据。至于流的窗口更新帧的具体发送过程,请听下回分解。

// 收到了窗口更新帧
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
	...
	// Find the stream and update it.
	if str, ok := l.estdStreams[w.streamID]; ok {
		str.bytesOutStanding -= int(w.increment)
		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
			str.state = active
			l.activeStreams.enqueue(str)
			return
		}
	}
}

流的发送能力不仅取决于已发送的数据量,还受链路发送窗口大小的影响。前文提到过,当 BDP 满足更新条件时,会发送 SettingInitialWindowSize 帧,更新对端链路发送窗口大小。

func (t *http2Server) updateFlowControl(n uint32) {
	...
	t.controlBuf.put(&outgoingSettings{
		ss: []http2.Setting{
			{
				ID:  http2.SettingInitialWindowSize,
				Val: n,
			},
		},
	})
}

当对端接收到该设置帧后,会更新其发送窗口大小 l.oiws。如果窗口变大,便会唤醒那些等待 strQuota 的流,通知它们可以继续发送数据。

func (l *loopyWriter) applySettings(ss []http2.Setting) {
	for _, s := range ss {
		switch s.ID {
		case http2.SettingInitialWindowSize:
			o := l.oiws
			l.oiws = s.Val
			if o < l.oiws {
				// If the new limit is greater make all depleted streams active.
				for _, stream := range l.estdStreams {
					if stream.state == waitingOnStreamQuota {
						stream.state = active
						l.activeStreams.enqueue(stream)
					}
				}
			}
		...
	}
}

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。