解密 channel -- 进阶篇
基本使用
- 不带缓冲的channel:只有发送者和接收者都准备好了才可以通讯,否则会阻塞
- 带缓冲的channel:
- 缓冲已满,发送者会阻塞
- 缓冲已空,接收者会阻塞
- 空(nil)channel:发送和接收都会导致goroutine泄露
- 已关闭(closed)的channel:
- 发送者会panic
- 接收者会收到零值,ok为false
- 再次close会panic
使用时要注意 channel 是否有缓冲,发送方和接收方是谁,由谁关闭,是否已经关闭。
实现要求
- 要设计缓冲来存储数据。无缓冲=缓冲容量为0
- 要能阻塞 goroutine,也要能唤醒 goroutine。这个基本依赖于 Go 的运行时:
- 发数据唤醒收数据的
- 收数据的唤醒发数据的
- 要维持住 goroutine 的等待队列,并且是收和发两个队列
数据结构
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
hchan 中使用 qcount、dataqsiz、buf、sendx、recvx 维护一个循环队列
- qcount – cap
- dataqsiz – len
- buf – ring buffer 结构,用于存储数据
- sendx – send index
- recvx – receive index
chansend、chanrecv、closechan 操作都是由 hchan 中的 lock 来保护的
发送过程
- 看是不是 nil channel,是的话直接阻塞
- 看有没有被阻塞的接受者,有的话直接交付数据, 返回
- 看看缓冲有没有满,没有就放缓冲,返回
- 阻塞,等待接收者来唤醒自己
- 被唤醒,做些清理工作
接收过程
- 看是不是 nil channel,是的话直接阻塞
- 看有没有被阻塞的发送者,有的话直接从发送者手里拿,返回
- 看看缓冲有没有数据,有就读缓冲,返回
- 阻塞,等待发送者来唤醒自己
- 被唤醒,做些清理工作
挂起与唤醒
- Sender 挂起,⼀定是由 receiver(或 close)唤醒
- Receiver 挂起,⼀定是由 sender(或 close)唤醒
- 与 gopark 其对应的 goready 位置为:
- channel send -> channel recv/close
- Lock -> Unlock
- Read -> Read Ready,epoll_wait 返回了该 fd 事件时
- Timer -> checkTimers,检查到期唤醒
内存逃逸
用 channel 发送指针,必会发生内存逃逸。因为编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收
实现消息队列
原生缺陷:
- 没有消费组概念。同一个事件不能被多个 goroutine 同时消费,只能消费一次。
- 无法回退,也无法随机消费。
利用 channel 来实现一个基于内存的消息队列,并且有消费组的概念。
type Broker struct {
mutex sync.RWMutex
chans []chan Msg
}
type Msg struct {
Content string
}
func (b *Broker) Send(m Msg) error {
b.mutex.RLock()
defer b.mutex.RUnlock()
for _, ch := range b.chans {
select {
case ch <- m:
default:
return errors.New("消息队列已满")
}
}
return nil
}
func (b *Broker) Subscribe(capacity int) (<-chan Msg, error) {
res := make(chan Msg, capacity)
b.mutex.Lock()
defer b.mutex.Unlock()
b.chans = append(b.chans, res)
return res, nil
}
func (b *Broker) Close() error {
b.mutex.Lock()
chans := b.chans
b.chans = nil
b.mutex.Unlock()
for _, ch := range chans {
close(ch)
}
return nil
}
实现一个任务池
利用 channel 来实现一个任务池。该任务 池允许开发者提交任务,并且设定最多多少个 goroutine 同时运行。
type TaskPool struct {
tasks chan Task
close chan struct{}
}
type Task func()
// NewTaskPool 新建任务池
// cnt 是 worker 数量
// cap 是 task 缓存的容量
func NewTaskPool(cnt int, cap int) *TaskPool {
res := &TaskPool{
tasks: make(chan Task, cap),
close: make(chan struct{}),
}
for i := 0; i < cnt; i++ {
go func() {
for {
select {
case <-res.close:
close(res.tasks)
return
case t := <-res.tasks:
t()
}
}
}()
}
return res
}
// Submit 提交任务
func (p *TaskPool) Submit(ctx context.Context, t Task) error {
select {
case p.tasks <- t:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// Close 方法会释放资源,只能调用一次。
func (p *TaskPool) Close() error {
close(p.close)
return nil
}
参考资料
- GopherCon 2017: Kavya Joshi - Understanding Channels
- Concurrency in Go
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。