Go猜想录
大道至简,悟者天成
Go语言Goroutine管理

Goroutine

操作系统会为该应用程序创建一个进程。作为一个应用程序,它像一个为所有资源而运行的容器。这些资源包括内存地址空间、文件句柄、设备和线程。

线程是操作系统调度的一种执行路径,用于在处理器执行我们在函数中编写的代码。一个进程从一个线程开始,即主线程,当该线程终止时,进程终止。这是因为主线程是应用程序的原点。然后,主线程可以依次启动更多的线程,而这些线程可以启动更多的线程。

无论线程属于哪个进程,操作系统都会安排线程在可用处理器上运行。每个操作系统都有自己的算法来做出这些决定。

Go 语言层面支持的 go 关键字,可以快速的让一个函数创建为 goroutine,我们可以认为 main 函数就是作为 goroutine 执行的。操作系统调度线程在可用处理器上运行,Go运行时调度 goroutines 在绑定到单个操作系统线程的逻辑处理器中运行(P)。即使使用这个单一的逻辑处理器和操作系统线程,也可以调度数十万 goroutine 以惊人的效率和性能并发运行。

如何清真的使用 goroutine

能不用就不用

package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	go func() {
		if err := http.ListenAndServe(":8080", nil); err != nil {
			log.Fatal(err)
		}
	}()

	select {}
}

上面的程序可以看到,为了防止main.main退出,结尾使用了空的select语句阻塞住了主函数。某种意义上来说是一个好的做法因为没有使用for循环或其他浪费CPU的方式防止主进程退出。不过这仍然属于脱裤子放屁。因为在执行ListenAndServe函数的goroutine返回前什么都做不了,那就不如不开启一个goroutine去完成这个任务,直接在主线程里做。

如果你的 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,那么通常情况下,你自己去做这项工作比委托它( go func() )更简单。这通常消除了将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。

package main

import (
	"fmt"
	"log"
	"net/http"
)

func main() {
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		fmt.Fprintln(w, "Hello, GopherCon SG")
	})
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatal(err)
	}
}

调用者决定是否要并发

// ListDirectory returns the contents of dir.
func ListDirectory(dir string) ([]string, error)

// ListDirectory returns a channel over which
// directory entries will be published. When the list
// of entries is exhausted, the channel will be closed.
func ListDirectory(dir string) chan string

以上两个API设计的功能都是要读取所有的目录

  1. 将目录读取到一个 slice 中,然后返回整个切片,或者如果出现错误,则返回错误。这是同步调用的,ListDirectory 的调用方会阻塞,直到读取所有目录条目。根据目录的大小,这可能需要很长时间,并且可能会分配大量内存来构建目录条目名称的 slice。
  2. ListDirectory 返回一个 chan string,将通过该 chan 传递目录。当通道关闭时,这表示不再有目录。由于在 ListDirectory 返回后发生通道的填充,ListDirectory 可能内部启动 goroutine 来填充通道。

ListDirectory chan 版本还有两个问题:

  • 通过使用一个关闭的通道作为不再需要处理的项目的信号,ListDirectory 无法告诉调用者通过通道返回的项目集不完整,因为中途遇到了错误。调用方无法区分空目录与完全从目录读取的错误之间的区别。这两种方法都会导致从 ListDirectory 返回的通道会立即关闭。
  • 调用者必须继续从通道读取,直到它关闭,因为这是调用者知道开始填充通道的 goroutine 已经停止的唯一方法。这对 ListDirectory 的使用是一个严重的限制,调用者必须花时间从通道读取数据,即使它可能已经收到了它想要的答案。对于大中型目录,它可能在内存使用方面更为高效,但这种方法并不比原始的基于 slice 的方法快。

解决之道是使用回调函数func ListDirectory(dir string, fn func(string)) error

filepath.WalkDir 也是类似的模型,如果函数启动 goroutine,则必须向调用方提供显式停止该goroutine 的方法。通常,将异步执行函数的决定权交给该函数的调用方通常更容易。

管理好整个生命周期

// leak is a buggy function. It launches a goroutine that
// blocks receiving from a channel. Nothing will ever be
// sent on that channel and the channel is never closed so
// that goroutine will be blocked forever.
func leak() {
	ch := make(chan int)

	go func() {
		val := <-ch
		fmt.Println("We received a value:", val)
	}()
}
  • 开启一个goroutine时要知道它何时会结束

  • 或者有什么手段可以让它结束

这样就可以确保不会发生goroutine泄漏

超时控制

package main

import (
	"context"
	"time"
)

func main() {
	tr := NewTracker()
	go tr.Run()
	_ = tr.Event(context.Background(), "t1")
	_ = tr.Event(context.Background(), "t2")
	_ = tr.Event(context.Background(), "t3")
	time.Sleep(3 * time.Second)
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
	defer cancel()
    // 由写的人去关闭channel
	tr.Shutdown(ctx)
}

func NewTracker() *Tracker {
	return &Tracker{
		ch: make(chan string, 10),
	}
}

// Tracker knows how to track events for the application.
type Tracker struct {
	ch   chan string
	stop chan struct{}
}

func (t *Tracker) Event(ctx context.Context, data string) error {
	select {
	case t.ch <- data:
		return nil
	case <-ctx.Done():
		return ctx.Err()

	}
}

func (t *Tracker) Run() {
	for data := range t.ch {
		time.Sleep(time.Second)
		println(data)
	}
	t.stop <- struct{}{}
}

func (t *Tracker) Shutdown(ctx context.Context) {
	close(t.ch)
	select {
	case <-t.stop:
	case <-ctx.Done():
	}
}

一个好点的使用案例

package main

import (
	"context"
	"fmt"
	"net/http"
)

func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
	s := http.Server{
		Addr:    addr,
		Handler: handler,
	}

	go func() {
		<-stop // wait for stop signal
		s.Shutdown(context.Background())
	}()

	return s.ListenAndServe()
}

func serveApp(stop <-chan struct{}) error {
	mux := http.NewServeMux()
	mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
		fmt.Fprintln(resp, "Hello, QCon!")
	})
	return serve("0.0.0.0:8080", mux, stop)
}

func serveDebug(stop <-chan struct{}) error {
	return serve("127.0.0.1:8001", http.DefaultServeMux, stop)
}

func main() {
	done := make(chan error, 2)
	stop := make(chan struct{})
	go func() {
		done <- serveDebug(stop)
	}()
	go func() {
		done <- serveApp(stop)
	}()

	var stopped bool
	for i := 0; i < cap(done); i++ {
		if err := <-done; err != nil {
			fmt.Printf("error: %v\n", err)
		}
		if !stopped {
			stopped = true
			close(stop)
		}
	}
}

优点:

  • 管理好了所有goroutine的生命周期,不会有泄露
  • 任何一个goroutine出现问题,都会让所有其他的goroutine优雅退出,不会造成任何数据的丢失
  • 所有需要后台执行的任务都是由调用者来决定
  • 调用者开启goroutine时有结束它的手段

Reference

  1. Why goroutines instead of threads?
  2. Practical Go: Real world advice for writing maintainable Go programs
  3. Goroutine Leaks - The Forgotten Sender
  4. Concurrency Trap #2: Incomplete Work
  5. da440dil/go-workgroup

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。