解密 channel -- 基础篇
序
要想彻底理解 channel,要抓住几个点:
- channel 带不带缓冲
- 谁在发
- 谁在收
- 谁来关
- 以及,关了没?
缓冲
- 不带缓冲:要求收发两端都必须要有 goroutine,否则就是阻塞。
- 带缓冲:没满或者没空之前都不会阻塞。但是满了或者空了就会阻塞。
总结:
- 对于发送者来说,只要发出去的数据没有地方放,那就是阻塞
- 对于接收者来说,只要尝试接收数据但是没拿到,那也会阻塞
func TestChannel(t *testing.T) {
ch := make(chan string, 4)
go func() {
str := <-ch
fmt.Println(str)
}()
go func() {
str := <-ch
fmt.Println(str)
}()
go func() {
str := <-ch
fmt.Println(str)
}()
ch <- "hello"
ch <- "hello"
time.Sleep(time.Second)
}
使用方向
利用 channel 的思路:
- 看做是队列,主要用于传递数据
- 利用阻塞特性,可以间接控制住 goroutine 或者其它资源的消耗。这种用法有点像是令牌机制。那么往 channel 里面读或者取一个数据,就有点像是拿到一个令牌,拿到令牌才可以做某件事
发布订阅模式
利用 channel 实现发布订阅模式非常简单,发布者不断往 channel 里面塞入数据,订阅者从 channel 里面取出数据。
进程内的事件驱动可以依托于 channel 来实现。
缺陷:
- 没有消费组概念。不能说同一个事件被多个 goroutine 同时消费,有且只能有一个
- 无法回退,也无法随机消费
实现消息队列
例子:利用 channel 来实现一个基于内存的消息队列,并且有消费组的概念。
思路:这个例子,难就难在 channel 里面的元素只能被一个 goroutine 取出来。所以要想同一个消息能够被多 goroutine 消费,就需要费一点手脚。
- 方案一:每一个消费者订阅的时候,创建一个子 channel
- 方案二:轮询所有的消费者
// 方案一
func TestBroker(t *testing.T) {
b := &Broker{
consumers: make([]*Consumer, 0, 10),
}
c1 := &Consumer{
ch: make(chan string, 1),
}
c2 := &Consumer{
ch: make(chan string, 1),
}
b.Subscribe(c1)
b.Subscribe(c2)
b.Produce("hello")
fmt.Println(<-c1.ch)
fmt.Println(<-c2.ch)
}
type Broker struct {
consumers []*Consumer
}
func (b *Broker) Produce(msg string) {
for _, c := range b.consumers {
c.ch <- msg
}
}
func (b *Broker) Subscribe(c *Consumer) {
b.consumers = append(b.consumers, c)
}
type Consumer struct {
ch chan string
}
// 方案二
type Broker1 struct {
ch chan string
consumers []func(s string)
}
func NewBroker1() *Broker1 {
b := &Broker1{ch: make(chan string, 10), consumers: make([]func(s string), 0, 10)}
go b.Start()
return b
}
func (b *Broker1) Produce(msg string) {
b.ch <- msg
}
func (b *Broker1) Subscribe(consume func(s string)) {
b.consumers = append(b.consumers, consume)
}
func (b *Broker1) Start() {
for {
s, ok := <-b.ch
if !ok {
return
}
for _, c := range b.consumers {
c(s)
}
}
}
func TestBroker1(t *testing.T) {
b := NewBroker1()
str1 := ""
b.Subscribe(func(s string) {
str1 = str1 + s
})
str2 := ""
b.Subscribe(func(s string) {
str2 = str2 + s
})
b.Produce("hello")
b.Produce(" ")
b.Produce("world")
time.Sleep(time.Second)
assert.Equal(t, "hello world", str1)
assert.Equal(t, "hello world", str2)
}
实现一个任务池
例子:利用 channel 来实现一个任务池。该任务池允许开发者提交任务,并且设定最多多少个 goroutine 同时运行。
思路:这个东西实现起来并不难,而是难在决策,要考虑:
- 提交任务的时候,如果执行 goroutine 满了,任务池是缓存住这个任务,还是直接阻塞提交者?
- 如果缓存,那么缓存需要多大?缓存满了又该怎么办?
type TaskPool struct {
ch chan struct{}
}
func NewTaskPool(limit int) *TaskPool {
t := &TaskPool{
ch: make(chan struct{}, limit),
}
// 提前准备好了令牌
for i := 0; i < limit; i++ {
t.ch <- struct{}{}
}
return t
}
func (t *TaskPool) Do(f func()) {
token := <-t.ch
// 异步执行
go func() {
f()
t.ch <- token
}()
// 同步执行
// f()
// t.ch <- token
}
func TestTaskPool_Do(t1 *testing.T) {
tp := NewTaskPool(2)
tp.Do(func() {
time.Sleep(time.Second)
fmt.Println("task1")
})
tp.Do(func() {
time.Sleep(time.Second)
fmt.Println("task2")
})
tp.Do(func() {
MyTask(1, "13")
})
}
func MyTask(a int, b string) {
}
type TaskPoolWithCache struct {
cache chan func()
}
func NewTaskPoolWithCache(limit int, cacheSize int) *TaskPoolWithCache {
t := &TaskPoolWithCache{
cache: make(chan func(), cacheSize),
}
// 直接把 goroutine 开好
for i := 0; i < limit; i++ {
go func() {
for {
// 在 goroutine 里面不断尝试从 cache 里面拿到任务
select {
case task, ok := <-t.cache:
if !ok {
return
}
task()
}
}
}()
}
return t
}
func (t *TaskPoolWithCache) Do(f func()) {
t.cache <- f
}
// 显式控制生命周期
// func (t *TaskPoolWithCache) Start() {
// for i := 0; i < t.limit; i++ {
// go func() {
// for {
// // 在 goroutine 里面不断尝试从 cache 里面拿到任务
// select {
// case task, ok := <-t.cache:
// if !ok {
// return
// }
// task()
// }
// }
// }()
// }
// }
func TestTaskPoolWithCache_Do(t1 *testing.T) {
tp := NewTaskPoolWithCache(2, 10)
tp.Do(func() {
time.Sleep(time.Second)
fmt.Println("task1")
})
tp.Do(func() {
time.Sleep(time.Second)
fmt.Println("task2")
})
id := 1
name := "Tom"
tp.Do(func() {
MyTask(id, name)
})
time.Sleep(2 * time.Second)
}
func MyTask(a int, b string) {
}
channel 与 goroutine 泄露
如果 channel 使用不当,就会导致 goroutine 泄露:
- 只发送不接收,那么发送者一直阻塞,会导致发送者 goroutine 泄露
- 只接收不发送,那么接收者一直阻塞,会导致接收者 goroutine 泄露
- 读写 nil 都会导致 goroutine 泄露
基本上可以说,goroutine 泄露都是因为 goroutine 被阻塞之后没有人唤醒它导致的。
唯一的例外是业务层面上 goroutine 长时间运行。
channel 与内存逃逸
内存分配:
- 分配到栈上:不需要考虑 GC
- 分配到堆上:需要考虑 GC
很不幸的,如果用 channel 发送指针,那么必然逃逸。
编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收!
实现细节
经过前面的学习,我们应该对 Go 怎么设计并发结构体有点心得体会了。那么我们先思考一下 设计这样的 chan 结构体有什么问题要考虑:
- 要设计缓冲来存储数据。无缓冲 = 缓冲容量为 0
- 要能阻塞 goroutine,也要能唤醒 goroutine。这个基本依赖于 Go 的运行时:
- 发数据唤醒收数据的
- 收数据的唤醒发数据的
- 要维持住 goroutine 的等待队列,并且是收和发两个队列
数据结构
- buf 是一个 ring buffer 结构,用于存储数据
- waitq 是一个双向链表,简单来说就是队列
所以简单说:
- 发送的时候,如果缓冲没满,或者有接收者,那就直接发;否则丢进去 sendq。
- 接收的时候,如果缓冲有数据,或者说有发送者,那就收;否则丢进去 recvq。
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
chansend
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
// nil 的 channel,直接阻塞,而且也无法被唤醒
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
...
// 有接收者在等数据
// 在有接收者阻塞的情况下,即便缓冲没满,发送者也是直接交付给接收者。
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// 没有接收者,准备丢缓冲
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
// 缓冲也没有,阻塞自己
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// 这个 KeepAlive 确保 ep 不会垃圾回收掉
// 实际上就是确保,发送的数据不会被垃圾回收掉,一般是和 SetFinalizer 结合使用
KeepAlive(ep)
// someone woke us up.
// 被唤醒,这个时候其实已经发完数据了,后面就做一下清理工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
步骤:
- 看是不是 nil channel,是的话直接阻塞
- 看有没有被阻塞的接受者,有的话直接交付数据,返回
- 看看缓冲有没有满,没有就放缓冲,返回
- 阻塞,等待接收者来唤醒自己
- 被唤醒,做些清理工作
chanrecv
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
// nil 的 channel,直接阻塞,而且也无法被唤醒
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
// Just found waiting sender with not closed.
// 有人等待发送,且 buffer 是 0 的话,直接接收数据
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
if c.qcount > 0 {
// 没人发,但是缓冲区中有数据,从缓冲里拿
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 阻塞,进队列
c.recvq.enqueue(mysg)
...
// 被唤醒,其实这时已经收到数据了
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
步骤:
- 看是不是 nil channel,是的话直接阻塞
- 看有没有被阻塞的发送者,有的话直接从发送者手里拿,返回(前提是 buffer 是 0)
- 看看缓冲有没有数据,有就读缓冲,返回
- 阻塞,等待发送者来唤醒自己
- 被唤醒,做些清理工作
总结
- channel 有 buffer 和没有 buffer 有什么特点
- 发送数据到 nil channel 会怎样?发送到已关闭的 channel 会怎样?
- 从 nil channel 接收数据会怎样?从已关闭的 channel 接收数据会怎样?
- channel 是怎么引起 goroutine 泄露的?或者说,goroutine 泄露有什么原因?
- channel 发送步骤
- channel 接收步骤
- 为什么 channel 发送指针数据会引起内存逃逸?
- 可能的代码题:
- 用 channel 实现一个任务池
- 用 channel 来控制 goroutine 数量
- 用 channel 来实现生产者消费者模型
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。