摘要
- Mutex 的公平性:Go 的锁是非公平锁。设计为非公平锁是为了提高性能,因为公平锁虽然可以防止饥饿问题,但会带来较高的调度开销,导致吞吐量下降。
- Mutex 的两种模式:Go 的
Mutex
有 正常模式 和 饥饿模式。在正常模式下,锁是非公平的,新来的 goroutine 有机会直接获取锁;当某个 goroutine 等待锁超过 1 毫秒时,锁会切换到饥饿模式,优先分配给等待时间最长的 goroutine。锁会在队列中只剩下一个 goroutine 或等待时间小于 1 毫秒时切回正常模式。 - 为什么设计为这两种模式:非公平锁在竞争不激烈时性能更好,而饥饿模式可以防止高竞争情况下的 goroutine 饥饿。相比公平锁,这种设计在保证性能的同时能有效应对饥饿问题。
- 新来的 goroutine 是否能获取锁:是的,即使队列中有 goroutine 等待锁,新来的 goroutine 大概率能直接获取锁(正常模式下)。
- Mutex 是否是可重入锁:不是。
Mutex
不支持同一 goroutine 多次获取同一把锁,否则会导致死锁。 - RWMutex 和 Mutex 的区别:
RWMutex
允许多个读操作并发进行,但写操作需要独占锁。对于大多数是读操作的场景,应优先选择RWMutex
;如果几乎都是写操作,则使用Mutex
。 - Mutex 如何挂起和唤醒 goroutine:当 goroutine 无法获取锁时,会通过
sema
字段挂起,使用runtime_SemacquireMutex
进行等待。当锁可用时,通过runtime_Semrelease
唤醒等待中的 goroutine。
Mutex 和 RWMutex
Mutex 可以看做是锁,而 RWMutex 则是读写锁。
一般的用法是将 Mutex 或者 RWMutex 和需要被保住的资源封装在一个结构体内。
- 如果有多个 goroutine 同时读写的资源,就一定要保护起来。
- 如果多个 goroutine 只读某个资源,那就不需要保护。
使用锁的时候,优先使用 RWMutex。
- RWMutex:核心就是四个方法
- RLock
- RUnlock
- Lock
- Unlock
- Mutex:Lock 和 Unlock
type safeResource struct {
mu sync.RWMutex
resource map[string]string
}
func (s *safeResource) Add(k, v string) {
s.mu.Lock()
defer s.mu.Unlock()
s.resource[k] = v
}
代码演示 — SafeMap 的 LoadOrStore 写法
正确版本
使用 RWMutex 实现 double-check:
- 加读锁先检查一遍
- 释放读锁
- 加写锁
- 再检查一遍
// SafeMap 可以看做是 map 的一个线程安全的封装。
type SafeMap[K comparable, V any] struct {
mu sync.RWMutex
m map[K]V
}
// 正确版本
// LoadOrStore 检查 map 中是否存在 key 对应的值,
// 若存在,则取出。若不存在,则存入新值 newVal。
func (s *SafeMap[K, V]) LoadOrStore(key K, newVal V) (V, bool) {
s.mu.RLock()
oldVal, ok := s.m[key]
s.mu.RUnlock()
if ok {
return oldVal, true
}
s.mu.Lock()
defer s.mu.Unlock()
// goroutine1 先执行,key => 1
// goroutine2 再执行,key => 2
// 值会被覆盖,所以要 double check
oldVal, ok = s.m[key]
if ok {
return oldVal, true
}
s.m[key] = newVal
return newVal, false
}
错误版本
// 错误版本1: 死锁
func (s *SafeMap[K, V]) LoadOrStoreV1(key K, newVal V) (V, bool) {
s.mu.RLock()
oldVal, ok := s.m[key]
defer s.mu.RUnlock()
if ok {
return oldVal, true
}
// 读锁未释放,再加写锁会死锁
s.mu.Lock()
defer s.mu.Unlock()
oldVal, ok = s.m[key]
if ok {
return oldVal, true
}
s.m[key] = newVal
return newVal, false
}
// 错误版本2: 覆盖
func (s *SafeMap[K, V]) LoadOrStoreV2(key K, newVal V) (V, bool) {
s.mu.RLock()
oldVal, ok := s.m[key]
s.mu.RUnlock()
if ok {
return oldVal, true
}
s.mu.Lock()
defer s.mu.Unlock()
// 无 double check,值可能被覆盖
s.m[key] = newVal
return newVal, false
}
func TestLoadOrStoreV1(t *testing.T) {
m := SafeMap[string, string]{
m: map[string]string{},
}
m.LoadOrStoreV1("k", "v")
}
func TestLoadOrStoreV2(t *testing.T) {
m := SafeMap[string, string]{
m: map[string]string{},
}
var wg sync.WaitGroup
wg.Add(2)
los := func(k, v string) {
m.LoadOrStoreV2(k, v)
wg.Done()
}
time.AfterFunc(time.Second, func() {
go los("k", "v1")
go los("k", "v2")
})
wg.Wait()
fmt.Println(m.m["k"])
}
代码演示 — 实现一个线程安全的 ArrayList
思路:切片本身不是线程安全的,所以最简单的做法就是利用读写锁封装一下。 这也是典型的装饰器模式的应用。
如果考虑扩展性,那么需要预先定义一个 List 接口,后续可以有 ArrayList,LinkedList,锁实现的线程安全 List,以及无锁实现的线程安全 List。
任何非线程安全的类型、接口都可以利用读写锁 + 装饰器模式无侵入式地改造为线程安全的类型、接口
https://github.com/gotomicro/ekit
Mutex 细节
源码分析
锁的一般实现都是依赖于:
- 自旋作为快路径
- 自旋可以通过控制次数或者时间来退出循环。
- 等待队列作为慢路径
- 慢路径:跟语言特性有关,有些依赖于操作系统线程调度,如 Java,有些是自己管,如 goroutine。
锁实现模板
// 伪代码
type Lock struct {
state int
}
func (l *Lock) Lock() {
var (
i = 0
locked = false
)
// 自旋
for locked = CAS(UN_LOCK, LOCKED); !locked && i < 10; {
i++
}
if locked {
return
}
// 将自己的线程加入阻塞队列
enqueue()
}
Go 的 Mutex 大致符合上面的模板,但是做了针对性的优化。
理解关键点:
- state 就是用来控制锁状态的核心,所谓加锁,就是把 state 修改为某个值,解锁也是类似
- sema 是用来处理沉睡、唤醒的信号量,依赖于两个 runtime 调用:
- runtime_SemacquireMutex:sema 加 1 并且挂起 goroutine
- runtime_Semrelease:sema 减 1 并且唤醒 sema 上等待的一个 goroutine
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
一次性的自旋:一把锁,如果没有人持有它,也没有人抢,那么一个 CAS 操作就能成功。
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
在锁实现模板里面,我们说自旋是快路径。Go 把这个归到了慢路径里面。
实际上在这个片段里面,还是很快的,因为没有进入等待队列的环节。
所以:理论上的自旋 = Go 的快路径 + Go 慢路径的自旋部分
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}
...
}
后半部分的代码就是控制锁的两种模式,以及进队列,被唤醒的部分了。
- 正常模式
- 饥饿模式
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
如果一个新的 goroutine 进来争夺锁,而且队列里面也有等待的 goroutine,你是设计者,你会把锁给谁?G1 还是 G2。
- 给 G2:毕竟我们要保证公平,先到先得是规矩,不能破坏
- G1 和 G2 竞争:保证效率。G1 肯定已经占着了 CPU,所以大概率能够拿到锁
正常模式下,G1 和 G2 竞争锁的机会均等,其核心优势是减少 goroutine 的调度开销。然而,如果每次 G2 试图获取锁时都被新来的 G1 抢先,G2 及其他队列中的 goroutine 可能会陷入饥饿状态。
当 G2 未能获取锁并返回队列头时,如果等待时间超过 1 毫秒,锁将切换为饥饿模式。在饥饿模式下,锁会优先分配给队列中的 goroutine。
饥饿模式,必须满足以下条件之一:队列中只剩一个 goroutine,或 G2 的等待时间小于 1 毫秒。
步骤总结
- 先上来一个 CAS 操作,如果这把锁正空闲,并且没人抢,那么就直接成功
- 否则,自旋几次,如果这个时候成功了,也不用加入队列
- 否则,加入队列
- 从队列中被唤醒:
- 正常模式:和新来的一起抢锁,但是大概率失败
- 饥饿模式:肯定拿到锁
解锁
- 上来就是一个 atomic 操作,解锁。理论上来说这也应该是一个 CAS 操作,即必须是加锁状态才能解锁,Go 这种写法效果是一样的;
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
- 解锁失败则是步入慢路径,也就是要唤醒等待队列里面的 goroutine。因为 Go 的锁有两种模式,所以你们能够看到两个分支
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 正常状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿状态
runtime_Semrelease(&m.sema, true, 1)
}
左边这里释放锁就会唤醒右边阻塞的 goroutine
Mutex 和 RWMutex 注意事项
- RWMutex 适合于读多写少的场景,写多读少不如直接加写锁
// 写多读少
func (s *SafeMap[K, V]) CheckAndDoSomethingV1() {
s.mu.Lock()
// check and do something
s.mu.Unlock()
}
func (s *SafeMap[K, V]) CheckAndDoSomethingV2() {
s.mu.RLock()
// check first time
s.mu.RUnlock()
s.mu.Lock()
defer s.mu.Unlock()
// check second time
// check and do something
}
- 可以考虑使用函数式写法,如延迟初始化
type valProvider func() any
func (s *SafeMap[K, V]) LoadOrStoreHeavy(key K, f valProvider) (V, bool) {
s.mu.RLock()
oldVal, ok := s.m[key]
s.mu.RUnlock()
if ok {
return oldVal, true
}
s.mu.Lock()
defer s.mu.Unlock()
oldVal, ok = s.m[key]
if ok {
return oldVal, true
}
// 延迟初始化
newVal := f().(V)
s.m[key] = newVal
return newVal, false
}
- Mutex 和 RWMutex 都是不可重入的
// 递归非常容易错
func main() {
RecursiveA()
}
var l sync.Mutex
func RecursiveA() {
l.Lock()
defer l.Unlock()
RecursiveB()
}
func RecursiveB() {
RecursiveC()
}
func RecursiveC() {
l.Lock()
defer l.Unlock()
RecursiveA()
}
- 尽可能用 defer 来解锁,避免 panic
- panic 是直接中断,所以如果你此时拿着锁,那么你是没有办法释放掉的。但是 defer 可以确保即便你 panic 了,这个动作也会执行
func (s *SafeListDecorator[T]) Set(index int, t T) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.l.Set(index, t)
}
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。