设计思落
效率 :快速加锁,减少阻塞
公平 :多竞争场景下,先到先得,防止饥饿
golang的锁有两种模式,正常模式和饥饿模式,兼容效率和公平
源码解析
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
starvationThresholdNs = 1e6
)
注释翻译
在正常状态下,所有等待锁的 goroutine 按照 FIFO 顺序等待。但是,刚唤醒(即出队)的 goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 去竞争锁。新请求锁的 goroutine 具有一个优势:它正在 CPU 上执行。而且可能有好几个 goroutine 同时在新请求锁,所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的 goroutine 在没有获得锁之后会加入到等待队列的最前面。如果一个等待的 goroutine 超过 1ms 没有获取锁,那么它将会把锁转变为饥饿模式。
在饥饿模式下,锁的所有权将从执行 unlock 的 goroutine 直接交给等待队列中的第一个等待锁者。新来的 goroutine 将不能再去尝试竞争锁,即使锁是 unlock 状态,也不会去尝试自旋操作,而是放在等待队列的尾部。
如果一个等待的 goroutine 获取了锁,并且满足以下其中一个条件:(1)它是等待队列中的最后一个;(2)它等待的时候小于1ms,那么该 goroutine 会将锁的状态转换为正常状态。 正常模式具有较好的性能,因为 goroutine 可以连续多次尝试获取锁,即使还有其他的阻塞等待锁的 goroutine,也不需要进入休眠阻塞。饥饿模式也很重要的,它的作用是阻止尾部延迟的现象。
数据结构
type Mutex struct {
state int32
sema uint32
}
state用于锁的状态流转,是个复合变量,用bit位代表不同的含义
第0位表示是否加锁状态。
第1位表示是否存在唤醒的goroutine。
第2位表示锁当前处在什么模式:正常模式/饥饿模式。
3-31位用于表示等待goroutine的个数。
sema是信号量,用于等待goroutine的唤醒操作。
加锁
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)是原子操作,用于判断锁是否为初始状态并尝试加锁,如果成功直接返回,提高加锁效率.
如果加锁失败说明锁已被占用,则调用lockSlow去做相应的操作.
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 非饥饿模式并且可以自旋
// 饥饿模式下是直接将锁交给下一位,所以我们这里是不可以获取锁的
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 尝试将唤醒位置1,避免没有必要的唤醒.
// 这里可以看出正常模式下对新来的goroutine比较友好.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 非饥饿模式下可以尝试获取锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 饥饿模式下只能进等待队列了
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 如果当前为饥饿模式,将锁切换为饥饿模式,如果当前是未加锁就不要切换,
// 因为饥饿模式下必须要有等待者,在此case下不一定成立
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// 如果当前goroutine是被唤醒的就需要设置被唤醒位
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 { // 如果之前就是解锁状态切非饥饿
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 阻塞排队
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
// 如果当前goroutine被唤醒且Mutex处于饥饿模式,
// 锁的拥有者将锁交到当前goroutine但是锁处于不一致的状态:
// mutexLocked还没有被设置并且我们还被算成一个等待者,需要修复改状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
// 如果当前为非饥饿模式或者自己是等待队列里的最后一个,则调整锁模式为正常模式
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 非饥饿模式下重新竞争锁,没有优势
awoke = true
iter = 0
} else {
old = m.state // 重来一次
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
lockSlow主要做下面的操作
会首先判断当前能不能进入自旋状态:
当前互斥锁的状态是非饥饿状态,并且已经被锁定了
自旋次数不超过 4 次
cpu 个数大于一,必须要是多核 cpu
当前正在执行当中,并且队列空闲的 p 的个数大于等于一
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
在自旋的过程中会尝试设置 mutexWoken 来通知解锁,从而避免唤醒其他已经休眠的goroutine, 在自旋模式下,当前的goroutine就能更快的获取到锁(line: 14)。
- 自旋完成之后,就会去计算当前的锁的状态,如果当前处在饥饿模式下则不会去请求锁(line:25)。
- 状态计算完成之后就会尝试使用 CAS 操作获取锁,如果获取成功就会直接退出循环。
- 如果没有获取到就调用 runtime_SemacquireMutex(&m.sema, queueLifo, 1) 方法休眠当前 goroutine 并且尝试获取信号量。runtime_SemacquireMutex 会休眠当前goroutine并等待信号量唤醒goroutine
- Semacquire 等待直到 *s > 0 并且自动减少*s值
- SemacquireMutex 跟 Semacquire类似,不过是为了互斥锁特殊优化的, 如果lifo为true直接将当前goroutine放在阻塞队列首位
- goroutine 被唤醒之后会先判断当前是否处在饥饿状态,(如果当前 goroutine 超过 1ms 都没有获取到锁就会进饥饿模式) 1. 如果处在饥饿状态就会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出 2. 如果不在,就会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环
解锁
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
// 解锁一个没有锁定的互斥量会报运行时错误.
// 一个加锁的Mutex并不和特定的goroutine绑定,允许一个goroutine对mutex加锁然后安排另一个goroutine解锁。
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 { // 如果new等于零表示没有其他任何需要通知的goroutine。直接返回
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 解锁一个未加锁的Mutex会报错
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果没有等待的goroutine或者已经有被唤醒的goroutine或者已经获得锁或者已经变成饥饿模式,则直接退出
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 将等待着减一并且设置唤醒位
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 唤醒等待goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式下直接唤醒队首的goroutine并且让出时间片使waiter可以立即运行
// 注意:此时mutexLocked还未设置,waiter在被唤醒后会进行设置,
// 但是在饥饿模式下Mutex会仍然被认为在加锁状态,因此新来的goroutine不会获取到锁
runtime_Semrelease(&m.sema, true, 1)
}
}
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
// Semrelease 自动增加s的值并且唤醒一个阻塞的goroutine。
// 如果handoff等于true,直接将count传递给第一个等待者
! func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
饥饿模式解锁
state 有 mutexStarving 标识、无 mutexLocked 标识,而被唤醒的 goroutine 会设置 mutexLocked 状态、将等待的 goroutine 数减 1。
注意的是,并不会马上清除 mutexStarving 标识,而是一直在饥饿模式下操作,直到遇到第一个符合条件的 goroutine 才会清除 mutexStarving 标识。 在饥饿模式期间,如果有新的 goroutine 来请求抢占锁,mutex 会被认为还处于锁状态,所以新来的goroutine 不会进行抢占锁操作。
总结
Mutex 通过正常模式和饥饿模式两种模式来平衡锁的效率和公平性。