gRPC-Go -- #9 流级别流控 strQuota
HTTP/2 支持多路复用,这意味着同一条链路上可以同时传输多个流。在 grpc-go 中,也对每个流实现了流量控制。总体上,每个流会记录其已经发送的字节数,当发送的数据达到一定量时,会暂停发送,等待对方接收数据。对方接收完成后,会通知发送方可以继续传输。
接下来,从源码角度分析具体实现。每个 outStream
都有一个 bytesOutStanding
字段,用于记录已发送的字节数。通过链路的发送窗口大小 l.oiws
减去 str.bytesOutStanding
计算出该流的发送能力 strQuota
。如果 strQuota
足以发送当前数据,则允许继续发送;否则,将该流状态改为 waitingOnStreamQuota
,暂停发送。数据发送成功后,会将其字节数累加到 bytesOutStanding
中。
// outStream 代表一个向外发送数据的流
type outStream struct {
...
bytesOutStanding int
...
}
// 发送数据
func (l *loopyWriter) processData() (bool, error) {
...
// 流允许发送的 quota
strQuota := int(l.oiws) - str.bytesOutStanding
if strQuota <= 0 { // stream-level flow control.
str.state = waitingOnStreamQuota
return false, nil
}
...
// 数据发送成功
str.bytesOutStanding += size
...
}
当接收方成功接收到数据后,会发送一个流的窗口更新帧,帧中包含流的 ID 和需要更新的窗口大小。更新后,如果 strQuota > 0
且当前流的状态为 waitingOnStreamQuota
,则会唤醒该流,重新允许其继续发送数据。至于流的窗口更新帧的具体发送过程,请听下回分解。
// 收到了窗口更新帧
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
...
// Find the stream and update it.
if str, ok := l.estdStreams[w.streamID]; ok {
str.bytesOutStanding -= int(w.increment)
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
str.state = active
l.activeStreams.enqueue(str)
return
}
}
}
流的发送能力不仅取决于已发送的数据量,还受链路发送窗口大小的影响。前文提到过,当 BDP 满足更新条件时,会发送 SettingInitialWindowSize
帧,更新对端链路发送窗口大小。
func (t *http2Server) updateFlowControl(n uint32) {
...
t.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: n,
},
},
})
}
当对端接收到该设置帧后,会更新其发送窗口大小 l.oiws
。如果窗口变大,便会唤醒那些等待 strQuota
的流,通知它们可以继续发送数据。
func (l *loopyWriter) applySettings(ss []http2.Setting) {
for _, s := range ss {
switch s.ID {
case http2.SettingInitialWindowSize:
o := l.oiws
l.oiws = s.Val
if o < l.oiws {
// If the new limit is greater make all depleted streams active.
for _, stream := range l.estdStreams {
if stream.state == waitingOnStreamQuota {
stream.state = active
l.activeStreams.enqueue(stream)
}
}
}
...
}
}
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。