BDP 概念
BDP(Bandwidth-Delay Product,带宽时延积)算法是一种用于优化网络传输效率的算法,尤其是在高带宽、高延迟网络中广泛应用。BDP 是衡量网络容量的一个关键指标,主要用于确定网络中可以有效传输的数据量,计算公式如下:
BDP = 带宽(bps)× 往返时延(RTT,秒)
- 带宽(Bandwidth):网络可以传输的最大速率,单位是比特每秒(bps)。
- 往返时延(RTT):从发送端发送数据包到接收端确认收到该数据包所需的时间,单位是秒(s)。
BDP 的单位是比特,表示在网络中任何给定时间可以在不拥塞的情况下传输的数据量。
作用与意义
在实际应用中,BDP 的计算结果帮助我们确定网络中的最大窗口大小,以便在不造成网络拥塞的情况下,尽可能提高数据传输效率。在 grpc-go 中,以 server 为例,在 server 计算出 BDP 若满足更新条件,会立刻更新 server 的接收窗口大小,同时会向 client 发送 bdp,让 client 更新发送窗口大小,以充分利用可用的带宽。需要注意的是并不会改动 server 的发送窗口,也即不会影响到 server 的发送数据的速率。当然,这个算法是对称的,server 和 client 各自会维护一个自己的 BDP。
举例说明:假设网络的带宽为 1 Gbps,RTT 是 50 毫秒,则 BDP 为:
BDP = 1×10^9×50×10^−3 = 50×10^6 比特
这意味着你可以在此网络中保持大约 50 Mb 的数据“在飞行中”,即发送方可以发送 50 Mb 的数据而无需等待接收方的确认。
算法的核心思路
- BDP Ping:每当接收方收到一个数据帧(数据包)时,它就发送一个 BDP Ping。这个 ping 只是一个特殊的心跳包,带有唯一标识符,用于估算 BDP。
- 开始计数:发送 BDP ping 后,接收方会开始统计它接收到的字节总数。这些字节包括触发该 BDP ping 的数据帧以及在 BDP ping 被发送后继续接收到的字节。
- 等待 BDP Ping 的 ACK:接收方会一直统计,直到它收到 BDP ping 的确认(acknowledgement)。这个过程中,统计的字节数量可以看作是接收方在大约 1.5 RTT(Round-Trip Time,往返时延)内接收到的有效数据量。
- 估算 BDP:这个统计出的字节总数是对 BDP 的一个有效估算值。大约 1.5 RTT 的数据量相当于 BDP * 1.5。
- 窗口调整:根据接收到的字节总数,接收方可以判断当前的窗口大小是否足够:
- 如果接收到的字节总数接近当前窗口大小(例如超过了窗口大小的 2/3),说明当前的窗口大小太小,限制了传输效率。
- 于是,接收方会增加窗口大小,将其调整为 2 倍的 BDP,以允许更多的数据通过。
为什么是 1.5 RTT?
这是因为在网络传输过程中,接收方在发送 BDP Ping 后会有一段时间的延迟,而它同时也在继续接收数据。因此,接收到 BDP Ping 的 ACK 需要经历一次完整的 RTT,而接收方会在这段时间内接收到额外的数据。这使得我们实际计算的是大约 1.5 RTT 内接收到的数据总量。
带宽校正机制
在 BDP 算法中,随着接收方窗口大小的增加,接收方每次能采样的数据量也会增加。由于 BDP 是基于接收到的数据量来估算的,采样的字节越多,BDP 的估算值就越大,窗口大小就会进一步增加。缓冲区膨胀(buffer-bloat)指的就是这个现象,窗口大小不断膨胀,但并不是因为网络带宽真正增加了,而是因为窗口变大了。由于窗口持续增加,网络延迟可能也会不断上升,进一步影响传输效率。
举个简单的例子,假设开始时窗口大小是 10MB,接收方通过 BDP 估算,认为网络的 BDP 是 10MB。于是窗口扩大到 20MB。下一次 BDP 估算时,由于窗口变大,接收方能采样到 20MB 的数据,于是 BDP 估算变为 20MB,接着窗口又增大到 40MB。这个过程会持续下去,窗口越来越大,而 BDP 估算也会不断增加。
为了防止这种无限膨胀的情况,算法引入了带宽估算的机制:
- 带宽 = 采样字节数 / (RTT * 1.5)。
- 在每次采样时,通过这个公式计算当前带宽,并将其与之前的最大带宽进行比较。
- 只有当新的带宽比之前的最大带宽增加时,才会增加窗口大小。这意味着只有在实际检测到网络带宽提升时,才会调整窗口,避免因窗口增大而造成的 BDP 增大。
算法实现
BDP Ping 帧的数据是 [8]byte{2, 4, 16, 16, 9, 14, 7, 7}
,其实就是 bdpping
的编码,是个小彩蛋。因为数据都是由帧发送器 loopyWriter
统一发送的,所以为了保证数据尽可能小的误差,是在发送前一刻才初始化了 发送时间
字段。
func (t *http2Server) handleData(f *http2.DataFrame) {
...
// 接收到第一个数据帧,触发 bdp 估算,发送 bdp ping 帧
if sendBDPPing {
// Avoid excessive ping detection (e.g. in an L7 proxy)
// by sending a window update prior to the BDP ping.
if w := t.fc.reset(); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: w,
})
}
t.controlBuf.put(bdpPing)
}
...
}
func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() {
...
// Maybe it's a BDP ping.
if t.bdpEst != nil {
// 收到 bdp ping ack,开始估算 bdp
// 估算后若满足更新条件,会立刻更新 server 的接收窗口大小
// 同时会向 client 发送 bdp,让 client 更新发送窗口大小
t.bdpEst.calculate(f.Data)
}
return
}
...
}
const (
// bdpLimit is the maximum value the flow control windows will be increased
// to. TCP typically limits this to 4MB, but some systems go up to 16MB.
// Since this is only a limit, it is safe to make it optimistic.
bdpLimit = (1 << 20) * 16
// alpha is a constant factor used to keep a moving average
// of RTTs.
alpha = 0.9
// If the current bdp sample is greater than or equal to
// our beta * our estimated bdp and the current bandwidth
// sample is the maximum bandwidth observed so far, we
// increase our bbp estimate by a factor of gamma.
beta = 0.66
// To put our bdp to be smaller than or equal to twice the real BDP,
// we should multiply our current sample with 4/3, however to round things out
// we use 2 as the multiplication factor.
gamma = 2
)
// Adding arbitrary data to ping so that its ack can be identified.
// Easter-egg: what does the ping message say?
var bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
type bdpEstimator struct {
// sentAt is the time when the ping was sent.
sentAt time.Time
mu sync.Mutex
// bdp is the current bdp estimate.
bdp uint32
// sample is the number of bytes received in one measurement cycle.
sample uint32
// bwMax is the maximum bandwidth noted so far (bytes/sec).
bwMax float64
// bool to keep track of the beginning of a new measurement cycle.
isSent bool
// Callback to update the window sizes.
updateFlowControl func(n uint32)
// sampleCount is the number of samples taken so far.
sampleCount uint64
// round trip time (seconds)
rtt float64
}
// timesnap registers the time bdp ping was sent out so that
// network rtt can be calculated when its ack is received.
// It is called (by controller) when the bdpPing is
// being written on the wire.
func (b *bdpEstimator) timesnap(d [8]byte) {
if bdpPing.data != d {
return
}
b.sentAt = time.Now()
}
// add adds bytes to the current sample for calculating bdp.
// It returns true only if a ping must be sent. This can be used
// by the caller (handleData) to make decision about batching
// a window update with it.
func (b *bdpEstimator) add(n uint32) bool {
b.mu.Lock()
defer b.mu.Unlock()
if b.bdp == bdpLimit {
return false
}
if !b.isSent {
b.isSent = true
b.sample = n
b.sentAt = time.Time{}
b.sampleCount++
return true
}
b.sample += n
return false
}
// calculate is called when an ack for a bdp ping is received.
// Here we calculate the current bdp and bandwidth sample and
// decide if the flow control windows should go up.
func (b *bdpEstimator) calculate(d [8]byte) {
// Check if the ping acked for was the bdp ping.
if bdpPing.data != d {
return
}
b.mu.Lock()
rttSample := time.Since(b.sentAt).Seconds()
if b.sampleCount < 10 {
// Bootstrap rtt with an average of first 10 rtt samples.
b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
} else {
// Heed to the recent past more.
b.rtt += (rttSample - b.rtt) * float64(alpha)
}
b.isSent = false
// The number of bytes accumulated so far in the sample is smaller
// than or equal to 1.5 times the real BDP on a saturated connection.
bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
if bwCurrent > b.bwMax {
b.bwMax = bwCurrent
}
// If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is
// greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we
// should update our perception of the network BDP.
if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
sampleFloat := float64(b.sample)
b.bdp = uint32(gamma * sampleFloat)
if b.bdp > bdpLimit {
b.bdp = bdpLimit
}
bdp := b.bdp
b.mu.Unlock()
b.updateFlowControl(bdp)
return
}
b.mu.Unlock()
}
// updateFlowControl updates the incoming flow control windows
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Server) updateFlowControl(n uint32) {
t.mu.Lock()
for _, s := range t.activeStreams {
s.fc.newLimit(n)
}
t.initialWindowSize = int32(n)
t.mu.Unlock()
t.controlBuf.put(&outgoingWindowUpdate{
streamID: 0,
increment: t.fc.newLimit(n),
})
t.controlBuf.put(&outgoingSettings{
ss: []http2.Setting{
{
ID: http2.SettingInitialWindowSize,
Val: n,
},
},
})
}
如何开启 BDP 算法
如果不加额外的配置项,grpc-go 框架默认就会开启 BDP 算法进行窗口更新。
默认的窗口大小是,以 server 端为例,server 的接收窗口大小是 65535
,client 的发送窗口大小是 65535
。然后会由 BDP 进行动态的窗口更新。
// The default value of flow control window size in HTTP2 spec.
defaultWindowSize = 65535
// The initial window size for flow control.
initialWindowSize = defaultWindowSize // for an RPC
但是如果初始化时,传入了下面两个方法的任意一个。即手动设置了 流的窗口大小
或 链路的窗口大小
,则 BDP 算法就会关闭,不再动态调整窗口大小。
// WithInitialWindowSize returns a DialOption which sets the value for initial
// window size on a stream. The lower bound for window size is 64K and any value
// smaller than that will be ignored.
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
})
}
// WithInitialConnWindowSize returns a DialOption which sets the value for
// initial window size on a connection. The lower bound for window size is 64K
// and any value smaller than that will be ignored.
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
})
}
总结
BDP 算法通过动态估算 BDP 来调整窗口大小,确保数据传输尽可能高效。通过不断监测接收的数据量,并根据 BDP 调整窗口,能够确保网络连接既不浪费带宽,也不过度拥塞。
有了实时调整的发送窗口大小后,又是如何通过这个窗口做到流量控制的呢?请听下回分解。
延伸阅读
- BDP estimation and dynamic flow control window
- gRPC Deep Dive: Prevent Your Service From Overtaking Itself - Lidi Zheng, Google
- gRPC 流量控制详解
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。