Go猜想录
大道至简,悟者天成
手搓连接池

连接池

客户端创建的连接往往是一次性使用,但创建连接的成本非常高:

  • 需要系统调用
  • TCP 连接需要完成三次握手
  • 在高并发情况下,可能会耗尽文件描述符

连接池的目的是复用已创建的连接,避免重复创建,从而提高性能。

开源实例 —— silenceper/pool

silenceper/pool

Github 地址:github.com/silenceper/pool

  • Initialcap:这种参数是在初始化的时候直接创建好的连接数量。过小,启动的时候可能大部分请求都需要创建连接;过大,则浪费。
  • MaxIdle:最大空闲连接数,过大浪费,过小无法应付突发流量
  • MaxCap: 最大连接数
//factory 创建连接的方法
factory := func() (interface{}, error) { return net.Dial("tcp", "127.0.0.1:4000") }

//close 关闭连接的方法
close := func(v interface{}) error { return v.(net.Conn).Close() }

//ping 检测连接的方法
//ping := func(v interface{}) error { return nil }

//创建一个连接池: 初始化5,最大空闲连接是20,最大并发连接30
poolConfig := &pool.Config{
	InitialCap: 5,//资源池初始连接数
	MaxIdle:   20,//最大空闲连接数
	MaxCap:     30,//最大并发连接数
	Factory:    factory,
	Close:      close,
	//Ping:       ping,
	//连接最大空闲时间,超过该时间的连接 将会关闭,可避免空闲时连接EOF,自动失效的问题
	IdleTimeout: 15 * time.Second,
}
p, err := pool.NewChannelPool(poolConfig)
if err != nil {
	fmt.Println("err=", err)
}

一般连接池处理流程

  • 阻塞的地方可以有超时控制,例如最多阻塞 15s
  • 从空闲处取出来的连接,可能需要进一步检查这个连接有没有超时(就是很久没用了)

net-4.png

  • Put 会先看有没有阻塞的 goroutine(线程),有就直接转交
  • 如果空闲队列满了,又没有人需要连接,那么需要关闭这个连接

net-5.png

Get 方法

// channelPool 存放连接信息
type channelPool struct {
	mu                       sync.RWMutex
	conns                    chan *idleConn // 空闲的
	factory                  func() (interface{}, error)
	close                    func(interface{}) error
	ping                     func(interface{}) error
	idleTimeout, waitTimeOut time.Duration
	maxActive                int
	openingConns             int
	connReqs                 []chan connReq // 被阻塞的 Get 请求
}
// Get 从pool中取一个连接
func (c *channelPool) Get() (interface{}, error) {
	conns := c.getConns()
	if conns == nil {
		return nil, ErrClosed
	}
	for {
		select {
		case wrapConn := <-conns:
			if wrapConn == nil {
				return nil, ErrClosed
			}
			//判断是否超时,超时则丢弃
			if timeout := c.idleTimeout; timeout > 0 {
				if wrapConn.t.Add(timeout).Before(time.Now()) {
					//丢弃并关闭该连接
					c.Close(wrapConn.conn)
					continue
				}
			}
			//判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
			if c.ping != nil {
				if err := c.Ping(wrapConn.conn); err != nil {
					c.Close(wrapConn.conn)
					continue
				}
			}
			return wrapConn.conn, nil
		default:
			c.mu.Lock()
			log.Debugf("openConn %v %v", c.openingConns, c.maxActive)
			if c.openingConns >= c.maxActive {
				req := make(chan connReq, 1)
				c.connReqs = append(c.connReqs, req)
				c.mu.Unlock()
				ret, ok := <-req
				if !ok {
					return nil, ErrMaxActiveConnReached
				}
				if timeout := c.idleTimeout; timeout > 0 {
					if ret.idleConn.t.Add(timeout).Before(time.Now()) {
						//丢弃并关闭该连接
						c.Close(ret.idleConn.conn)
						continue
					}
				}
				return ret.idleConn.conn, nil
			}
			if c.factory == nil {
				c.mu.Unlock()
				return nil, ErrClosed
			}
			conn, err := c.factory()
			if err != nil {
				c.mu.Unlock()
				return nil, err
			}
			c.openingConns++
			c.mu.Unlock()
			return conn, nil
		}
	}
}

Put 方法

// Put 将连接放回pool中
func (c *channelPool) Put(conn interface{}) error {
	if conn == nil {
		return errors.New("connection is nil. rejecting")
	}

	c.mu.Lock()

	if c.conns == nil {
		c.mu.Unlock()
		return c.Close(conn)
	}

	if l := len(c.connReqs); l > 0 {
		req := c.connReqs[0]
		copy(c.connReqs, c.connReqs[1:])
		c.connReqs = c.connReqs[:l-1]
		req <- connReq{
			idleConn: &idleConn{conn: conn, t: time.Now()},
		}
		c.mu.Unlock()
		return nil
	} else {
		select {
		case c.conns <- &idleConn{conn: conn, t: time.Now()}:
			c.mu.Unlock()
			return nil
		default:
			c.mu.Unlock()
			//连接池已满,直接关闭该连接
			return c.Close(conn)
		}
	}
}

总结

  • Get 要考虑:
    • 有空闲连接,直接返回
    • 否则,没超过最大连接数,直接创建新的
    • 否则,阻塞调用方
  • Put 要考虑:
    • 有 Get 请求被阻塞,把连接丢过去
    • 否则,没超过最大空闲连接数,放到空闲列表
    • 否则,直接关闭

net-6.png

连接池运作图解

起步

刚开始啥都没有,直接创建一个新的。

net-7.png

超过上限

假如说我们不断请求连接,直到超过了十个连接。

请求被阻塞。

net-8.png

放回去,有阻塞请求

假如说这时候有人用完了连接,就放回来了。

唤醒一个请求,然后将连接交过去。

net-9.png

放回去空闲连接队列

如果这个时候没有阻塞请求,并且此时空闲连接队列还没有满,那么就放回去空闲连接队列。

net-10.png

空闲连接队列满了

空闲队列都满了,只能关掉这个连接了。

net-11.png

从空闲连接队列 GET

空闲队列有可用连接,直接拿。

net-12.png

开源实例 —— sql.DB

连接池管理

它也基本遵循前面总结的:

  • 利用 channel 来管理空闲连接
  • 利用一个队列来阻塞请求

sql.DB 有很多细节,这里我们只是看它怎么管连接的。

type DB struct {
	// Atomic access only. At top of struct to prevent mis-alignment
	// on 32-bit platforms. Of type time.Duration.
	waitDuration int64 // Total time waited for new connections.

	connector driver.Connector
	// numClosed is an atomic counter which represents a total number of
	// closed connections. Stmt.openStmt checks it before cleaning closed
	// connections in Stmt.css.
	numClosed uint64

	mu           sync.Mutex    // protects following fields
	freeConn     []*driverConn // free connections ordered by returnedAt oldest to newest
	connRequests map[uint64]chan connRequest
	...
}
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
	db.mu.Lock()
	if db.closed {
		db.mu.Unlock()
		return nil, errDBClosed
	}
	// Check if the context is expired.
	select {
	default:
	case <-ctx.Done():
		db.mu.Unlock()
		return nil, ctx.Err()
	}
	lifetime := db.maxLifetime

	// Prefer a free connection, if possible.
	last := len(db.freeConn) - 1
	if strategy == cachedOrNewConn && last >= 0 {
		// Reuse the lowest idle time connection so we can close
		// connections which remain idle as soon as possible.
		conn := db.freeConn[last]
		db.freeConn = db.freeConn[:last]
		conn.inUse = true
		if conn.expired(lifetime) {
			db.maxLifetimeClosed++
			db.mu.Unlock()
			conn.Close()
			return nil, driver.ErrBadConn
		}
		db.mu.Unlock()

		// Reset the session if required.
		if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
			conn.Close()
			return nil, err
		}

		return conn, nil
	}

	// Out of free connections or we were asked not to use one. If we're not
	// allowed to open any more connections, make a request and wait.
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		// Make the connRequest channel. It's buffered so that the
		// connectionOpener doesn't block while waiting for the req to be read.
		req := make(chan connRequest, 1)
		reqKey := db.nextRequestKeyLocked()
		db.connRequests[reqKey] = req
		db.waitCount++
		db.mu.Unlock()

		waitStart := nowFunc()

		// Timeout the connection request with the context.
		select {
		case <-ctx.Done():
			// Remove the connection request and ensure no value has been sent
			// on it after removing.
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			select {
			default:
			case ret, ok := <-req:
				if ok && ret.conn != nil {
					db.putConn(ret.conn, ret.err, false)
				}
			}
			return nil, ctx.Err()
		case ret, ok := <-req:
			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			if !ok {
				return nil, errDBClosed
			}
			// Only check if the connection is expired if the strategy is cachedOrNewConns.
			// If we require a new connection, just re-use the connection without looking
			// at the expiry time. If it is expired, it will be checked when it is placed
			// back into the connection pool.
			// This prioritizes giving a valid connection to a client over the exact connection
			// lifetime, which could expire exactly after this point anyway.
			if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
				db.mu.Lock()
				db.maxLifetimeClosed++
				db.mu.Unlock()
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			if ret.conn == nil {
				return nil, ret.err
			}

			// Reset the session if required.
			if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
				ret.conn.Close()
				return nil, err
			}
			return ret.conn, ret.err
		}
	}

	db.numOpen++ // optimistically
	db.mu.Unlock()
	ci, err := db.connector.Connect(ctx)
	if err != nil {
		db.mu.Lock()
		db.numOpen-- // correct for earlier optimism
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		return nil, err
	}
	db.mu.Lock()
	dc := &driverConn{
		db:         db,
		createdAt:  nowFunc(),
		returnedAt: nowFunc(),
		ci:         ci,
		inUse:      true,
	}
	db.addDepLocked(dc, dc)
	db.mu.Unlock()
	return dc, nil
}

putConn

因为本身 DB 比较复杂,所以在 putConn 的时候要做很多校验,维持好整体状态:

  • 处理 ErrBadConn 的情况
  • 确保 dc 并没有任何人在使用
  • 处理超时
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
// err 可以简单理解为关闭的原因
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
	if !errors.Is(err, driver.ErrBadConn) {
		if !dc.validateConnection(resetSession) {
			err = driver.ErrBadConn
		}
	}
	db.mu.Lock()
	if !dc.inUse {
		db.mu.Unlock()
		if debugGetPut {

这个方法步骤和 silenceper/pool 的 Put 流程几乎一致。

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
	// 超出上限,直接关闭
	if db.closed {
		return false
	}
	if db.maxOpen > 0 && db.numOpen > db.maxOpen {
		return false
	}
	if c := len(db.connRequests); c > 0 {
		var req chan connRequest
		var reqKey uint64
		for reqKey, req = range db.connRequests {
			break
		}
		delete(db.connRequests, reqKey) // Remove from pending requests.
		if err == nil {
			dc.inUse = true
		}
		// 将连接丢给被阻塞请求
		req <- connRequest{
			conn: dc,
			err:  err,
		}
		return true
	} else if err == nil && !db.closed {
		if db.maxIdleConnsLocked() > len(db.freeConn) {
			// 放到空闲列表
			db.freeConn = append(db.freeConn, dc)
			db.startCleanerLocked()
			return true
		}
		db.maxIdleClosed++
	}
	return false
}

总结:过期时间处理

我们在 sql.DB 和连接池里面都看到了一个过期时间的处理。 在开发中,还有类似的场景,例如说本地缓存的过期时间。

可能的方案都是:

  • 每一个连接都有一个 goroutine 盯着,过期了就直接 close 掉
  • 一个 goroutine 定期检查所有的连接,把过期的关掉
  • 不管,要用之前就检查一下过期了没

net-13.png

空闲连接我们都是放在 channel 里,怎么定期检查?

代码演示 —— 综合反射和网络编程实现 RPC 调用

前面我们已经演示过了如何使用反射拿到方法调用信息,现在我们实现一个简单的 RPC 调用。

要考虑:

  • 怎么告诉服务端,我请求有多少字节;服务端反过来也要告诉客户端,我响应有多长。
  • 怎么告诉服务端,我请求用什么编码;服务端也要告诉客户端,响应是怎么编码的。

总结

  • 几个参数的含义:初始连接,最大空闲连接,最大连接数
  • 连接池的运作原理:拿连接会发生什么,放回去又会发生什么
  • sql.DB 解决过期连接的懒惰策略,可以类比其它如本地缓存的

代码演示 —— 手搓连接池

type Option func(p *SimplePool)

type SimplePool struct {
	idleChan chan conn
	waitChan chan *conReq

	factory     func() (net.Conn, error)
	idleTimeout time.Duration

	maxCnt int32
	// 连接数
	cnt int32

	l sync.Mutex
}

func NewSimplePool(factory func() (net.Conn, error), opts ...Option) *SimplePool {
	res := &SimplePool{
		idleChan: make(chan conn, 16),
		waitChan: make(chan *conReq, 128),
		factory:  factory,
		maxCnt:   128,
	}
	for _, opt := range opts {
		opt(res)
	}
	return res
}

func (p *SimplePool) Get() (net.Conn, error) {
	for {
		select {
		case c := <-p.idleChan:
			// 超时,直接关闭.
			// 有没有觉得奇怪,就是明明我们就是需要一个连接,但是我们还关闭了
			if c.lastActive.Add(p.idleTimeout).Before(time.Now()) {
				atomic.AddInt32(&p.cnt, -1)
				_ = c.c.Close()
				continue
			}
			return c.c, nil
		default:
			cnt := atomic.AddInt32(&p.cnt, 1)
			if cnt <= p.maxCnt {
				return p.factory()
			}
			atomic.AddInt32(&p.cnt, -1)
			req := &conReq{
				con: make(chan conn, 1),
			}
			// 可能阻塞在这两句,对应不同的情况。
			// 所以实际上 waitChan 根本不需要设计很大的容量
			// 另外,这里需不需要加锁?
			p.waitChan <- req
			c := <-req.con
			return c.c, nil
		}
	}
}

func (p *SimplePool) Put(c net.Conn) {
	// 为什么我只在这个部分加锁,其余部分都不加?
	p.l.Lock()
	if len(p.waitChan) > 0 {
		req := <-p.waitChan
		p.l.Unlock()
		req.con <- conn{c: c, lastActive: time.Now()}
		return
	}

	p.l.Unlock()

	select {
	case p.idleChan <- conn{c: c, lastActive: time.Now()}:
	default:
		defer func() {
			atomic.AddInt32(&p.maxCnt, -1)
		}()
		_ = c.Close()
	}
}

// WithMaxIdleCnt 自定义最大空闲连接数量
func WithMaxIdleCnt(maxIdleCnt int32) Option {
	return func(p *SimplePool) {
		p.idleChan = make(chan conn, maxIdleCnt)
	}
}

// WithMaxCnt 自定义最大连接数量
func WithMaxCnt(maxCnt int32) Option {
	return func(p *SimplePool) {
		p.maxCnt = maxCnt
	}
}

type conn struct {
	c          net.Conn
	lastActive time.Time
}

type conReq struct {
	con chan conn
}
// TestSimplePool 这个难以用 table-driven 的形式来写测试
func TestSimplePool(t *testing.T) {
	p := NewSimplePool(func() (net.Conn, error) {
		return &mockConn{}, nil
	}, WithMaxIdleCnt(2), WithMaxCnt(3))

	// 这三次都能正常拿出来
	c1, err := p.Get()
	assert.Nil(t, err)
	c2 , err := p.Get()
	assert.Nil(t, err)
	c3, err := p.Get()
	assert.Nil(t, err)

	// 正常放回去
	p.Put(c1)
	p.Put(c2)

	// 空闲队列满了,这里c3会被关闭
	p.Put(c3)
	assert.True(t, c3.(*mockConn).closed)
}

func TestSimplePool_GetBlock(t *testing.T) {
	p := NewSimplePool(func() (net.Conn, error) {
		return &mockConn{}, nil
	}, WithMaxIdleCnt(2), WithMaxCnt(3))

	// 这三次都能正常拿出来
	c1, err := p.Get()
	assert.Nil(t, err)
	_ , err = p.Get()
	assert.Nil(t, err)
	_, err = p.Get()
	assert.Nil(t, err)

	now := time.Now()

	go func() {
		// 睡两秒
		time.Sleep(time.Second)
		p.Put(c1)
	}()
	// 直接阻塞
	c4, err := p.Get()
	assert.Nil(t, err)
	// 就是我们放回去的那个
	assert.Equal(t, c1, c4)
	// 确认被阻塞过
	assert.Greater(t, time.Now().Sub(now), time.Second)
}

// mockConn 用于辅助测试
type mockConn struct {
	closed bool
}

func (m *mockConn) Read(b []byte) (n int, err error) {
	// TODO implement me
	panic("implement me")
}

func (m *mockConn) Write(b []byte) (n int, err error) {
	// TODO implement me
	panic("implement me")
}

func (m *mockConn) Close() error {
	// 用于辅助测试
	fmt.Println("connection closing")
	m.closed = true
	return nil
}

func (m *mockConn) LocalAddr() net.Addr {
	// TODO implement me
	panic("implement me")
}

func (m *mockConn) RemoteAddr() net.Addr {
	// TODO implement me
	panic("implement me")
}

func (m *mockConn) SetDeadline(t time.Time) error {
	// TODO implement me
	panic("implement me")
}

func (m *mockConn) SetReadDeadline(t time.Time) error {
	// TODO implement me
	panic("implement me")
}

func (m *mockConn) SetWriteDeadline(t time.Time) error {
	// TODO implement me
	panic("implement me")
}

延伸阅读


知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。