Go猜想录
大道至简,悟者天成
解密 channel -- 基础篇

要想彻底理解 channel,要抓住几个点:

  • channel 带不带缓冲
  • 谁在发
  • 谁在收
  • 谁来关
  • 以及,关了没?

缓冲

  • 不带缓冲:要求收发两端都必须要有 goroutine,否则就是阻塞。
  • 带缓冲:没满或者没空之前都不会阻塞。但是满了或者空了就会阻塞。

总结:

  • 对于发送者来说,只要发出去的数据没有地方放,那就是阻塞
  • 对于接收者来说,只要尝试接收数据但是没拿到,那也会阻塞

chan-0.png

func TestChannel(t *testing.T) {
	ch := make(chan string, 4)
	go func() {
		str := <-ch
		fmt.Println(str)
	}()
	go func() {
		str := <-ch
		fmt.Println(str)
	}()
	go func() {
		str := <-ch
		fmt.Println(str)
	}()

	ch <- "hello"
	ch <- "hello"
	time.Sleep(time.Second)
}

使用方向

利用 channel 的思路:

  • 看做是队列,主要用于传递数据
  • 利用阻塞特性,可以间接控制住 goroutine 或者其它资源的消耗。这种用法有点像是令牌机制。那么往 channel 里面读或者取一个数据,就有点像是拿到一个令牌,拿到令牌才可以做某件事

发布订阅模式

利用 channel 实现发布订阅模式非常简单,发布者不断往 channel 里面塞入数据,订阅者从 channel 里面取出数据。

进程内的事件驱动可以依托于 channel 来实现。

缺陷:

  • 没有消费组概念。不能说同一个事件被多个 goroutine 同时消费,有且只能有一个
  • 无法回退,也无法随机消费

实现消息队列

例子:利用 channel 来实现一个基于内存的消息队列,并且有消费组的概念。

思路:这个例子,难就难在 channel 里面的元素只能被一个 goroutine 取出来。所以要想同一个消息能够被多 goroutine 消费,就需要费一点手脚。

  • 方案一:每一个消费者订阅的时候,创建一个子 channel
  • 方案二:轮询所有的消费者

chan-1.png

// 方案一
func TestBroker(t *testing.T) {
	b := &Broker{
		consumers: make([]*Consumer, 0, 10),
	}
	c1 := &Consumer{
		ch: make(chan string, 1),
	}
	c2 := &Consumer{
		ch: make(chan string, 1),
	}
	b.Subscribe(c1)
	b.Subscribe(c2)

	b.Produce("hello")
	fmt.Println(<-c1.ch)
	fmt.Println(<-c2.ch)
}

type Broker struct {
	consumers []*Consumer
}

func (b *Broker) Produce(msg string) {
	for _, c := range b.consumers {
		c.ch <- msg
	}
}

func (b *Broker) Subscribe(c *Consumer) {
	b.consumers = append(b.consumers, c)
}

type Consumer struct {
	ch chan string
}
// 方案二
type Broker1 struct {
	ch        chan string
	consumers []func(s string)
}

func NewBroker1() *Broker1 {
	b := &Broker1{ch: make(chan string, 10), consumers: make([]func(s string), 0, 10)}
	go b.Start()
	return b
}

func (b *Broker1) Produce(msg string) {
	b.ch <- msg
}

func (b *Broker1) Subscribe(consume func(s string)) {
	b.consumers = append(b.consumers, consume)
}

func (b *Broker1) Start() {
	for {
		s, ok := <-b.ch
		if !ok {
			return
		}
		for _, c := range b.consumers {
			c(s)
		}
	}
}

func TestBroker1(t *testing.T) {
	b := NewBroker1()
	str1 := ""
	b.Subscribe(func(s string) {
		str1 = str1 + s
	})

	str2 := ""
	b.Subscribe(func(s string) {
		str2 = str2 + s
	})

	b.Produce("hello")
	b.Produce(" ")
	b.Produce("world")

	time.Sleep(time.Second)
	assert.Equal(t, "hello world", str1)
	assert.Equal(t, "hello world", str2)
}

实现一个任务池

例子:利用 channel 来实现一个任务池。该任务池允许开发者提交任务,并且设定最多多少个 goroutine 同时运行。

思路:这个东西实现起来并不难,而是难在决策,要考虑:

  • 提交任务的时候,如果执行 goroutine 满了,任务池是缓存住这个任务,还是直接阻塞提交者?
  • 如果缓存,那么缓存需要多大?缓存满了又该怎么办?

chan-2.png

type TaskPool struct {
	ch chan struct{}
}

func NewTaskPool(limit int) *TaskPool {
	t := &TaskPool{
		ch: make(chan struct{}, limit),
	}
	// 提前准备好了令牌
	for i := 0; i < limit; i++ {
		t.ch <- struct{}{}
	}
	return t
}

func (t *TaskPool) Do(f func()) {
	token := <-t.ch
	// 异步执行
	go func() {
		f()
		t.ch <- token
	}()

	// 同步执行
	// f()
	// t.ch <- token
}

func TestTaskPool_Do(t1 *testing.T) {
	tp := NewTaskPool(2)
	tp.Do(func() {
		time.Sleep(time.Second)
		fmt.Println("task1")
	})

	tp.Do(func() {
		time.Sleep(time.Second)
		fmt.Println("task2")
	})

	tp.Do(func() {
		MyTask(1, "13")
	})
}
func MyTask(a int, b string) {
}
type TaskPoolWithCache struct {
	cache chan func()
}

func NewTaskPoolWithCache(limit int, cacheSize int) *TaskPoolWithCache {
	t := &TaskPoolWithCache{
		cache: make(chan func(), cacheSize),
	}
	// 直接把 goroutine 开好
	for i := 0; i < limit; i++ {
		go func() {
			for {
				// 在 goroutine 里面不断尝试从 cache 里面拿到任务
				select {
				case task, ok := <-t.cache:
					if !ok {
						return
					}
					task()
				}
			}
		}()
	}
	return t
}

func (t *TaskPoolWithCache) Do(f func()) {
	t.cache <- f
}

// 显式控制生命周期
// func (t *TaskPoolWithCache) Start() {
// 	for i := 0; i < t.limit; i++ {
// 		go func() {
// 			for {
// 				// 在 goroutine 里面不断尝试从 cache 里面拿到任务
// 				select {
// 				case task, ok := <-t.cache:
// 					if !ok {
// 						return
// 					}
// 					task()
// 				}
// 			}
// 		}()
// 	}
// }

func TestTaskPoolWithCache_Do(t1 *testing.T) {
	tp := NewTaskPoolWithCache(2, 10)
	tp.Do(func() {
		time.Sleep(time.Second)
		fmt.Println("task1")
	})

	tp.Do(func() {
		time.Sleep(time.Second)
		fmt.Println("task2")
	})

	id := 1
	name := "Tom"
	tp.Do(func() {
		MyTask(id, name)
	})

	time.Sleep(2 * time.Second)
}

func MyTask(a int, b string) {
}

channel 与 goroutine 泄露

如果 channel 使用不当,就会导致 goroutine 泄露:

  • 只发送不接收,那么发送者一直阻塞,会导致发送者 goroutine 泄露
  • 只接收不发送,那么接收者一直阻塞,会导致接收者 goroutine 泄露
  • 读写 nil 都会导致 goroutine 泄露

基本上可以说,goroutine 泄露都是因为 goroutine 被阻塞之后没有人唤醒它导致的。

唯一的例外是业务层面上 goroutine 长时间运行。

chan-3.png

channel 与内存逃逸

内存分配:

  • 分配到栈上:不需要考虑 GC
  • 分配到堆上:需要考虑 GC

很不幸的,如果用 channel 发送指针,那么必然逃逸。

编译器无法确定,发送的指针数据最终会被哪个 goroutine 接收!

chan-4.png

实现细节

经过前面的学习,我们应该对 Go 怎么设计并发结构体有点心得体会了。那么我们先思考一下 设计这样的 chan 结构体有什么问题要考虑:

  • 要设计缓冲来存储数据。无缓冲 = 缓冲容量为 0
  • 要能阻塞 goroutine,也要能唤醒 goroutine。这个基本依赖于 Go 的运行时:
    • 发数据唤醒收数据的
    • 收数据的唤醒发数据的
  • 要维持住 goroutine 的等待队列,并且是收和发两个队列

数据结构

  • buf 是一个 ring buffer 结构,用于存储数据
  • waitq 是一个双向链表,简单来说就是队列

所以简单说:

  • 发送的时候,如果缓冲没满,或者有接收者,那就直接发;否则丢进去 sendq。
  • 接收的时候,如果缓冲有数据,或者说有发送者,那就收;否则丢进去 recvq。
type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

chansend

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
// nil 的 channel,直接阻塞,而且也无法被唤醒
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}
...
// 有接收者在等数据
// 在有接收者阻塞的情况下,即便缓冲没满,发送者也是直接交付给接收者。
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {
// 没有接收者,准备丢缓冲
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
...
	c.sendq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
// 缓冲也没有,阻塞自己
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
// 这个 KeepAlive 确保 ep 不会垃圾回收掉
// 实际上就是确保,发送的数据不会被垃圾回收掉,一般是和 SetFinalizer 结合使用
	KeepAlive(ep)
	// someone woke us up.
// 被唤醒,这个时候其实已经发完数据了,后面就做一下清理工作
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}

步骤:

  1. 看是不是 nil channel,是的话直接阻塞
  2. 看有没有被阻塞的接受者,有的话直接交付数据,返回
  3. 看看缓冲有没有满,没有就放缓冲,返回
  4. 阻塞,等待接收者来唤醒自己
  5. 被唤醒,做些清理工作

chan-5.png

chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled: don't need to check ep, as it is always on the stack
	// or is new memory allocated by reflect.

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {
		if !block {
			return
		}
// nil 的 channel,直接阻塞,而且也无法被唤醒
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
...
		// Just found waiting sender with not closed.
// 有人等待发送,且 buffer 是 0 的话,直接接收数据
		if sg := c.sendq.dequeue(); sg != nil {
			// Found a waiting sender. If buffer is size 0, receive value
			// directly from sender. Otherwise, receive from head of queue
			// and add sender's value to the tail of the queue (both map to
			// the same buffer slot because the queue is full).
			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
			return true, true
		}
	}
...
	if c.qcount > 0 {
// 没人发,但是缓冲区中有数据,从缓冲里拿
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
...
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
// 阻塞,进队列
	c.recvq.enqueue(mysg)
...
// 被唤醒,其实这时已经收到数据了
	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

步骤:

  1. 看是不是 nil channel,是的话直接阻塞
  2. 看有没有被阻塞的发送者,有的话直接从发送者手里拿,返回(前提是 buffer 是 0)
  3. 看看缓冲有没有数据,有就读缓冲,返回
  4. 阻塞,等待发送者来唤醒自己
  5. 被唤醒,做些清理工作

chan-6.png

总结

  • channel 有 buffer 和没有 buffer 有什么特点
  • 发送数据到 nil channel 会怎样?发送到已关闭的 channel 会怎样?
  • 从 nil channel 接收数据会怎样?从已关闭的 channel 接收数据会怎样?
  • channel 是怎么引起 goroutine 泄露的?或者说,goroutine 泄露有什么原因?
  • channel 发送步骤
  • channel 接收步骤
  • 为什么 channel 发送指针数据会引起内存逃逸?
  • 可能的代码题:
    • 用 channel 实现一个任务池
    • 用 channel 来控制 goroutine 数量
    • 用 channel 来实现生产者消费者模型

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。