Go猜想录
大道至简,悟者天成
sync.Mutex 源码分析

摘要

  • Mutex 的公平性:Go 的锁是非公平锁。设计为非公平锁是为了提高性能,因为公平锁虽然可以防止饥饿问题,但会带来较高的调度开销,导致吞吐量下降。
  • Mutex 的两种模式:Go 的 Mutex正常模式饥饿模式。在正常模式下,锁是非公平的,新来的 goroutine 有机会直接获取锁;当某个 goroutine 等待锁超过 1 毫秒时,锁会切换到饥饿模式,优先分配给等待时间最长的 goroutine。锁会在队列中只剩下一个 goroutine 或等待时间小于 1 毫秒时切回正常模式。
  • 为什么设计为这两种模式:非公平锁在竞争不激烈时性能更好,而饥饿模式可以防止高竞争情况下的 goroutine 饥饿。相比公平锁,这种设计在保证性能的同时能有效应对饥饿问题。
  • 新来的 goroutine 是否能获取锁:是的,即使队列中有 goroutine 等待锁,新来的 goroutine 大概率能直接获取锁(正常模式下)。
  • Mutex 是否是可重入锁:不是。Mutex 不支持同一 goroutine 多次获取同一把锁,否则会导致死锁。
  • RWMutex 和 Mutex 的区别RWMutex 允许多个读操作并发进行,但写操作需要独占锁。对于大多数是读操作的场景,应优先选择 RWMutex;如果几乎都是写操作,则使用 Mutex
  • Mutex 如何挂起和唤醒 goroutine:当 goroutine 无法获取锁时,会通过 sema 字段挂起,使用 runtime_SemacquireMutex 进行等待。当锁可用时,通过 runtime_Semrelease 唤醒等待中的 goroutine。

Mutex 和 RWMutex

Mutex 可以看做是锁,而 RWMutex 则是读写锁。
一般的用法是将 Mutex 或者 RWMutex 和需要被保住的资源封装在一个结构体内。

  • 如果有多个 goroutine 同时读写的资源,就一定要保护起来。
  • 如果多个 goroutine 只读某个资源,那就不需要保护。

使用锁的时候,优先使用 RWMutex。

  • RWMutex:核心就是四个方法
    • RLock
    • RUnlock
    • Lock
    • Unlock
  • Mutex:Lock 和 Unlock
type safeResource struct {
	mu       sync.RWMutex
	resource map[string]string
}

func (s *safeResource) Add(k, v string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.resource[k] = v
}

代码演示 — SafeMap 的 LoadOrStore 写法

正确版本

使用 RWMutex 实现 double-check:

  1. 加读锁先检查一遍
  2. 释放读锁
  3. 加写锁
  4. 再检查一遍
// SafeMap 可以看做是 map 的一个线程安全的封装。
type SafeMap[K comparable, V any] struct {
	mu sync.RWMutex
	m  map[K]V
}

// 正确版本
// LoadOrStore 检查 map 中是否存在 key 对应的值,
// 若存在,则取出。若不存在,则存入新值 newVal。
func (s *SafeMap[K, V]) LoadOrStore(key K, newVal V) (V, bool) {
	s.mu.RLock()
	oldVal, ok := s.m[key]
	s.mu.RUnlock()
	if ok {
		return oldVal, true
	}
	s.mu.Lock()
	defer s.mu.Unlock()
	// goroutine1 先执行,key => 1
	// goroutine2 再执行,key => 2
	// 值会被覆盖,所以要 double check

	oldVal, ok = s.m[key]
	if ok {
		return oldVal, true
	}

	s.m[key] = newVal
	return newVal, false
}

错误版本

// 错误版本1: 死锁
func (s *SafeMap[K, V]) LoadOrStoreV1(key K, newVal V) (V, bool) {
	s.mu.RLock()
	oldVal, ok := s.m[key]
	defer s.mu.RUnlock()
	if ok {
		return oldVal, true
	}
	// 读锁未释放,再加写锁会死锁
	s.mu.Lock()
	defer s.mu.Unlock()

	oldVal, ok = s.m[key]
	if ok {
		return oldVal, true
	}

	s.m[key] = newVal
	return newVal, false
}

// 错误版本2: 覆盖
func (s *SafeMap[K, V]) LoadOrStoreV2(key K, newVal V) (V, bool) {
	s.mu.RLock()
	oldVal, ok := s.m[key]
	s.mu.RUnlock()
	if ok {
		return oldVal, true
	}
	s.mu.Lock()
	defer s.mu.Unlock()

	// 无 double check,值可能被覆盖
	s.m[key] = newVal
	return newVal, false
}
func TestLoadOrStoreV1(t *testing.T) {
	m := SafeMap[string, string]{
		m: map[string]string{},
	}
	m.LoadOrStoreV1("k", "v")
}

func TestLoadOrStoreV2(t *testing.T) {
	m := SafeMap[string, string]{
		m: map[string]string{},
	}

	var wg sync.WaitGroup
	wg.Add(2)
	los := func(k, v string) {
		m.LoadOrStoreV2(k, v)
		wg.Done()
	}
	time.AfterFunc(time.Second, func() {
		go los("k", "v1")
		go los("k", "v2")
	})
	wg.Wait()

	fmt.Println(m.m["k"])
}

代码演示 — 实现一个线程安全的 ArrayList

思路:切片本身不是线程安全的,所以最简单的做法就是利用读写锁封装一下。 这也是典型的装饰器模式的应用。

如果考虑扩展性,那么需要预先定义一个 List 接口,后续可以有 ArrayList,LinkedList,锁实现的线程安全 List,以及无锁实现的线程安全 List。

任何非线程安全的类型、接口都可以利用读写锁 + 装饰器模式无侵入式地改造为线程安全的类型、接口

https://github.com/gotomicro/ekit

Mutex 细节

源码分析

锁的一般实现都是依赖于:

  • 自旋作为快路径
    • 自旋可以通过控制次数或者时间来退出循环。
  • 等待队列作为慢路径
    • 慢路径:跟语言特性有关,有些依赖于操作系统线程调度,如 Java,有些是自己管,如 goroutine。

mutex-1.png

锁实现模板

// 伪代码
type Lock struct {
	state int
}

func (l *Lock) Lock() {
	var (
		i      = 0
		locked = false
	)
	// 自旋
	for locked = CAS(UN_LOCK, LOCKED); !locked && i < 10; {
		i++
	}

	if locked {
		return
	}

	// 将自己的线程加入阻塞队列
	enqueue()
}

Go 的 Mutex 大致符合上面的模板,但是做了针对性的优化。

理解关键点:

  • state 就是用来控制锁状态的核心,所谓加锁,就是把 state 修改为某个值,解锁也是类似
  • sema 是用来处理沉睡、唤醒的信号量,依赖于两个 runtime 调用:
    • runtime_SemacquireMutex:sema 加 1 并且挂起 goroutine
    • runtime_Semrelease:sema 减 1 并且唤醒 sema 上等待的一个 goroutine
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema  uint32
}

一次性的自旋:一把锁,如果没有人持有它,也没有人抢,那么一个 CAS 操作就能成功。

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

在锁实现模板里面,我们说自旋是快路径。Go 把这个归到了慢路径里面。 实际上在这个片段里面,还是很快的,因为没有进入等待队列的环节。
所以:理论上的自旋 = Go 的快路径 + Go 慢路径的自旋部分

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		...
	}
	...
}

后半部分的代码就是控制锁的两种模式,以及进队列,被唤醒的部分了。

  • 正常模式
  • 饥饿模式
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}

如果一个新的 goroutine 进来争夺锁,而且队列里面也有等待的 goroutine,你是设计者,你会把锁给谁?G1 还是 G2。

mutex-2.png

  • 给 G2:毕竟我们要保证公平,先到先得是规矩,不能破坏
  • G1 和 G2 竞争:保证效率。G1 肯定已经占着了 CPU,所以大概率能够拿到锁

正常模式下,G1 和 G2 竞争锁的机会均等,其核心优势是减少 goroutine 的调度开销。然而,如果每次 G2 试图获取锁时都被新来的 G1 抢先,G2 及其他队列中的 goroutine 可能会陷入饥饿状态。

当 G2 未能获取锁并返回队列头时,如果等待时间超过 1 毫秒,锁将切换为饥饿模式。在饥饿模式下,锁会优先分配给队列中的 goroutine。

mutex-3.png

饥饿模式,必须满足以下条件之一:队列中只剩一个 goroutine,或 G2 的等待时间小于 1 毫秒。

mutex-4.png

步骤总结

  1. 先上来一个 CAS 操作,如果这把锁正空闲,并且没人抢,那么就直接成功
  2. 否则,自旋几次,如果这个时候成功了,也不用加入队列
  3. 否则,加入队列
  4. 从队列中被唤醒:
    • 正常模式:和新来的一起抢锁,但是大概率失败
    • 饥饿模式:肯定拿到锁

mutex-5.png

解锁

  • 上来就是一个 atomic 操作,解锁。理论上来说这也应该是一个 CAS 操作,即必须是加锁状态才能解锁,Go 这种写法效果是一样的;
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}
  • 解锁失败则是步入慢路径,也就是要唤醒等待队列里面的 goroutine。因为 Go 的锁有两种模式,所以你们能够看到两个分支
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// Grab the right to wake someone.
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			// 正常状态
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		// 饥饿状态	
		runtime_Semrelease(&m.sema, true, 1)
	}

mutex-6.png

左边这里释放锁就会唤醒右边阻塞的 goroutine

Mutex 和 RWMutex 注意事项

  • RWMutex 适合于读多写少的场景,写多读少不如直接加写锁
// 写多读少
func (s *SafeMap[K, V]) CheckAndDoSomethingV1() {
	s.mu.Lock()
	// check and do something
	s.mu.Unlock()
}

func (s *SafeMap[K, V]) CheckAndDoSomethingV2() {
	s.mu.RLock()
	// check first time
	s.mu.RUnlock()

	s.mu.Lock()
	defer s.mu.Unlock()
	// check second time

	// check and do something
}
  • 可以考虑使用函数式写法,如延迟初始化
type valProvider func() any

func (s *SafeMap[K, V]) LoadOrStoreHeavy(key K, f valProvider) (V, bool) {
	s.mu.RLock()
	oldVal, ok := s.m[key]
	s.mu.RUnlock()
	if ok {
		return oldVal, true
	}
	s.mu.Lock()
	defer s.mu.Unlock()
	oldVal, ok = s.m[key]
	if ok {
		return oldVal, true
	}

	// 延迟初始化
	newVal := f().(V)
	s.m[key] = newVal
	return newVal, false
}
  • Mutex 和 RWMutex 都是不可重入的
// 递归非常容易错
func main() {
	RecursiveA()
}

var l sync.Mutex

func RecursiveA() {
	l.Lock()
	defer l.Unlock()
	RecursiveB()
}

func RecursiveB() {
	RecursiveC()
}

func RecursiveC() {
	l.Lock()
	defer l.Unlock()
	RecursiveA()
}
  • 尽可能用 defer 来解锁,避免 panic
    • panic 是直接中断,所以如果你此时拿着锁,那么你是没有办法释放掉的。但是 defer 可以确保即便你 panic 了,这个动作也会执行
func (s *SafeListDecorator[T]) Set(index int, t T) {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.l.Set(index, t)
}

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。