Go猜想录
大道至简,悟者天成
解密 channel -- 进阶篇

基本使用

channel-result.png

  • 不带缓冲的channel:只有发送者和接收者都准备好了才可以通讯,否则会阻塞
  • 带缓冲的channel:
    1. 缓冲已满,发送者会阻塞
    2. 缓冲已空,接收者会阻塞
  • 空(nil)channel:发送和接收都会导致goroutine泄露
  • 已关闭(closed)的channel:
    1. 发送者会panic
    2. 接收者会收到零值,ok为false
    3. 再次close会panic

使用时要注意 channel 是否有缓冲,发送方和接收方是谁,由谁关闭,是否已经关闭。

实现要求

  • 要设计缓冲来存储数据。无缓冲=缓冲容量为0
  • 要能阻塞 goroutine,也要能唤醒 goroutine。这个基本依赖于 Go 的运行时:
    • 发数据唤醒收数据的
    • 收数据的唤醒发数据的
  • 要维持住 goroutine 的等待队列,并且是收和发两个队列

数据结构

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	timer    *timer // timer feeding this chan
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

hchan 中使用 qcount、dataqsiz、buf、sendx、recvx 维护一个循环队列

  • qcount – cap
  • dataqsiz – len
  • buf – ring buffer 结构,用于存储数据
  • sendx – send index
  • recvx – receive index

chansend、chanrecv、closechan 操作都是由 hchan 中的 lock 来保护的

发送过程

channel-0.png

  1. 看是不是 nil channel,是的话直接阻塞
  2. 看有没有被阻塞的接受者,有的话直接交付数据, 返回
  3. 看看缓冲有没有满,没有就放缓冲,返回
  4. 阻塞,等待接收者来唤醒自己
  5. 被唤醒,做些清理工作

接收过程

49fa02d71f262f4cf112eb101a3d3712.png

  1. 看是不是 nil channel,是的话直接阻塞
  2. 看有没有被阻塞的发送者,有的话直接从发送者手里拿,返回
  3. 看看缓冲有没有数据,有就读缓冲,返回
  4. 阻塞,等待发送者来唤醒自己
  5. 被唤醒,做些清理工作

挂起与唤醒

  • Sender 挂起,⼀定是由 receiver(或 close)唤醒
  • Receiver 挂起,⼀定是由 sender(或 close)唤醒
  • 与 gopark 其对应的 goready 位置为:
    • channel send -> channel recv/close
    • Lock -> Unlock
    • Read -> Read Ready,epoll_wait 返回了该 fd 事件时
    • Timer -> checkTimers,检查到期唤醒

内存逃逸

用 channel 发送指针,必会发生内存逃逸。因为编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收

实现消息队列

原生缺陷:

  • 没有消费组概念。同一个事件不能被多个 goroutine 同时消费,只能消费一次。
  • 无法回退,也无法随机消费。

利用 channel 来实现一个基于内存的消息队列,并且有消费组的概念。

type Broker struct {
	mutex sync.RWMutex
	chans []chan Msg
}

type Msg struct {
	Content string
}

func (b *Broker) Send(m Msg) error {
	b.mutex.RLock()
	defer b.mutex.RUnlock()
	for _, ch := range b.chans {
		select {
		case ch <- m:
		default:
			return errors.New("消息队列已满")
		}
	}
	return nil
}

func (b *Broker) Subscribe(capacity int) (<-chan Msg, error) {
	res := make(chan Msg, capacity)
	b.mutex.Lock()
	defer b.mutex.Unlock()
	b.chans = append(b.chans, res)
	return res, nil
}

func (b *Broker) Close() error {
	b.mutex.Lock()
	chans := b.chans
	b.chans = nil
	b.mutex.Unlock()

	for _, ch := range chans {
		close(ch)
	}
	return nil
}

实现一个任务池

利用 channel 来实现一个任务池。该任务 池允许开发者提交任务,并且设定最多多少个 goroutine 同时运行。

type TaskPool struct {
	tasks chan Task
	close chan struct{}
}

type Task func()

// NewTaskPool 新建任务池
// cnt 是 worker 数量
// cap 是 task 缓存的容量
func NewTaskPool(cnt int, cap int) *TaskPool {
	res := &TaskPool{
		tasks: make(chan Task, cap),
		close: make(chan struct{}),
	}

	for i := 0; i < cnt; i++ {
		go func() {
			for {
				select {
				case <-res.close:
					close(res.tasks)
					return
				case t := <-res.tasks:
					t()
				}
			}
		}()
	}
	return res
}

// Submit 提交任务
func (p *TaskPool) Submit(ctx context.Context, t Task) error {
	select {
	case p.tasks <- t:
	case <-ctx.Done():
		return ctx.Err()
	}
	return nil
}

// Close 方法会释放资源,只能调用一次。
func (p *TaskPool) Close() error {
	close(p.close)
	return nil
}

参考资料


知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。