Goroutine
操作系统会为该应用程序创建一个进程。作为一个应用程序,它像一个为所有资源而运行的容器。这些资源包括内存地址空间、文件句柄、设备和线程。
线程是操作系统调度的一种执行路径,用于在处理器执行我们在函数中编写的代码。一个进程从一个线程开始,即主线程,当该线程终止时,进程终止。这是因为主线程是应用程序的原点。然后,主线程可以依次启动更多的线程,而这些线程可以启动更多的线程。
无论线程属于哪个进程,操作系统都会安排线程在可用处理器上运行。每个操作系统都有自己的算法来做出这些决定。
Go 语言层面支持的 go 关键字,可以快速的让一个函数创建为 goroutine,我们可以认为 main 函数就是作为 goroutine 执行的。操作系统调度线程在可用处理器上运行,Go运行时调度 goroutines 在绑定到单个操作系统线程的逻辑处理器中运行(P)。即使使用这个单一的逻辑处理器和操作系统线程,也可以调度数十万 goroutine 以惊人的效率和性能并发运行。
如何清真的使用 goroutine
能不用就不用
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, GopherCon SG")
})
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}()
select {}
}
上面的程序可以看到,为了防止main.main退出,结尾使用了空的select语句阻塞住了主函数。某种意义上来说是一个好的做法因为没有使用for循环或其他浪费CPU的方式防止主进程退出。不过这仍然属于脱裤子放屁。因为在执行ListenAndServe函数的goroutine返回前什么都做不了,那就不如不开启一个goroutine去完成这个任务,直接在主线程里做。
如果你的 goroutine 在从另一个 goroutine 获得结果之前无法取得进展,那么通常情况下,你自己去做这项工作比委托它( go func() )更简单。这通常消除了将结果从 goroutine 返回到其启动器所需的大量状态跟踪和 chan 操作。
package main
import (
"fmt"
"log"
"net/http"
)
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "Hello, GopherCon SG")
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatal(err)
}
}
调用者决定是否要并发
// ListDirectory returns the contents of dir.
func ListDirectory(dir string) ([]string, error)
// ListDirectory returns a channel over which
// directory entries will be published. When the list
// of entries is exhausted, the channel will be closed.
func ListDirectory(dir string) chan string
以上两个API设计的功能都是要读取所有的目录
- 将目录读取到一个 slice 中,然后返回整个切片,或者如果出现错误,则返回错误。这是同步调用的,ListDirectory 的调用方会阻塞,直到读取所有目录条目。根据目录的大小,这可能需要很长时间,并且可能会分配大量内存来构建目录条目名称的 slice。
- ListDirectory 返回一个 chan string,将通过该 chan 传递目录。当通道关闭时,这表示不再有目录。由于在 ListDirectory 返回后发生通道的填充,ListDirectory 可能内部启动 goroutine 来填充通道。
ListDirectory chan 版本还有两个问题:
- 通过使用一个关闭的通道作为不再需要处理的项目的信号,ListDirectory 无法告诉调用者通过通道返回的项目集不完整,因为中途遇到了错误。调用方无法区分空目录与完全从目录读取的错误之间的区别。这两种方法都会导致从 ListDirectory 返回的通道会立即关闭。
- 调用者必须继续从通道读取,直到它关闭,因为这是调用者知道开始填充通道的 goroutine 已经停止的唯一方法。这对 ListDirectory 的使用是一个严重的限制,调用者必须花时间从通道读取数据,即使它可能已经收到了它想要的答案。对于大中型目录,它可能在内存使用方面更为高效,但这种方法并不比原始的基于 slice 的方法快。
解决之道是使用回调函数func ListDirectory(dir string, fn func(string)) error
filepath.WalkDir 也是类似的模型,如果函数启动 goroutine,则必须向调用方提供显式停止该goroutine 的方法。通常,将异步执行函数的决定权交给该函数的调用方通常更容易。
管理好整个生命周期
// leak is a buggy function. It launches a goroutine that
// blocks receiving from a channel. Nothing will ever be
// sent on that channel and the channel is never closed so
// that goroutine will be blocked forever.
func leak() {
ch := make(chan int)
go func() {
val := <-ch
fmt.Println("We received a value:", val)
}()
}
-
开启一个goroutine时要知道它何时会结束
-
或者有什么手段可以让它结束
这样就可以确保不会发生goroutine泄漏
超时控制
package main
import (
"context"
"time"
)
func main() {
tr := NewTracker()
go tr.Run()
_ = tr.Event(context.Background(), "t1")
_ = tr.Event(context.Background(), "t2")
_ = tr.Event(context.Background(), "t3")
time.Sleep(3 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
// 由写的人去关闭channel
tr.Shutdown(ctx)
}
func NewTracker() *Tracker {
return &Tracker{
ch: make(chan string, 10),
}
}
// Tracker knows how to track events for the application.
type Tracker struct {
ch chan string
stop chan struct{}
}
func (t *Tracker) Event(ctx context.Context, data string) error {
select {
case t.ch <- data:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (t *Tracker) Run() {
for data := range t.ch {
time.Sleep(time.Second)
println(data)
}
t.stop <- struct{}{}
}
func (t *Tracker) Shutdown(ctx context.Context) {
close(t.ch)
select {
case <-t.stop:
case <-ctx.Done():
}
}
一个好点的使用案例
package main
import (
"context"
"fmt"
"net/http"
)
func serve(addr string, handler http.Handler, stop <-chan struct{}) error {
s := http.Server{
Addr: addr,
Handler: handler,
}
go func() {
<-stop // wait for stop signal
s.Shutdown(context.Background())
}()
return s.ListenAndServe()
}
func serveApp(stop <-chan struct{}) error {
mux := http.NewServeMux()
mux.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
fmt.Fprintln(resp, "Hello, QCon!")
})
return serve("0.0.0.0:8080", mux, stop)
}
func serveDebug(stop <-chan struct{}) error {
return serve("127.0.0.1:8001", http.DefaultServeMux, stop)
}
func main() {
done := make(chan error, 2)
stop := make(chan struct{})
go func() {
done <- serveDebug(stop)
}()
go func() {
done <- serveApp(stop)
}()
var stopped bool
for i := 0; i < cap(done); i++ {
if err := <-done; err != nil {
fmt.Printf("error: %v\n", err)
}
if !stopped {
stopped = true
close(stop)
}
}
}
优点:
- 管理好了所有goroutine的生命周期,不会有泄露
- 任何一个goroutine出现问题,都会让所有其他的goroutine优雅退出,不会造成任何数据的丢失
- 所有需要后台执行的任务都是由调用者来决定
- 调用者开启goroutine时有结束它的手段
Reference
- Why goroutines instead of threads?
- Practical Go: Real world advice for writing maintainable Go programs
- Goroutine Leaks - The Forgotten Sender
- Concurrency Trap #2: Incomplete Work
- da440dil/go-workgroup
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。