您的位置 首页 golang

Golang Mutex学习

设计思落

效率 :快速加锁,减少阻塞

公平 :多竞争场景下,先到先得,防止饥饿

golang的锁有两种模式,正常模式和饥饿模式,兼容效率和公平

源码解析

 // A Mutex is a mutual exclusion lock.
  // The zero value for a Mutex is an unlocked mutex.
  //
  // A Mutex must not be copied after first use.
  type Mutex struct {
      state int32
      sema  uint32
  }

  // A Locker represents an object that can be locked and unlocked.
  type Locker interface {
      Lock()
      Unlock()
  }

  const (
      mutexLocked = 1 << iota // mutex is locked
      mutexWoken
      mutexStarving
      mutexWaiterShift = iota

      // Mutex fairness.
      //
      // Mutex can be in 2 modes of operations: normal and starvation.
      // In normal mode waiters are queued in FIFO order, but a woken up waiter
      // does not own the mutex and competes with new arriving goroutines over
      // the ownership. New arriving goroutines have an advantage -- they are
      // already running on CPU and there can be lots of them, so a woken up
      // waiter has good chances of losing. In such case it is queued at front
      // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
      // it switches mutex to the starvation mode.
      //
      // In starvation mode ownership of the mutex is directly handed off from
      // the unlocking goroutine to the waiter at the front of the queue.
      // New arriving goroutines don't try to acquire the mutex even if it appears
      // to be unlocked, and don't try to spin. Instead they queue themselves at
      // the tail of the wait queue.
      //
      // If a waiter receives ownership of the mutex and sees that either
      // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
      // it switches mutex back to normal operation mode.
      //
      // Normal mode has considerably better performance as a goroutine can acquire
      // a mutex several times in a row even if there are blocked waiters.
      // Starvation mode is important to prevent pathological cases of tail latency.
      starvationThresholdNs = 1e6
  )  

注释翻译

在正常状态下,所有等待锁的 goroutine 按照 FIFO 顺序等待。但是,刚唤醒(即出队)的 goroutine 不会直接拥有锁,而是会和新请求锁的 goroutine 去竞争锁。新请求锁的 goroutine 具有一个优势:它正在 CPU 上执行。而且可能有好几个 goroutine 同时在新请求锁,所以刚刚唤醒的 goroutine 有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的 goroutine 在没有获得锁之后会加入到等待队列的最前面。如果一个等待的 goroutine 超过 1ms 没有获取锁,那么它将会把锁转变为饥饿模式。

在饥饿模式下,锁的所有权将从执行 unlock 的 goroutine 直接交给等待队列中的第一个等待锁者。新来的 goroutine 将不能再去尝试竞争锁,即使锁是 unlock 状态,也不会去尝试自旋操作,而是放在等待队列的尾部。

如果一个等待的 goroutine 获取了锁,并且满足以下其中一个条件:(1)它是等待队列中的最后一个;(2)它等待的时候小于1ms,那么该 goroutine 会将锁的状态转换为正常状态。 正常模式具有较好的性能,因为 goroutine 可以连续多次尝试获取锁,即使还有其他的阻塞等待锁的 goroutine,也不需要进入休眠阻塞。饥饿模式也很重要的,它的作用是阻止尾部延迟的现象。

数据结构

 type Mutex struct {
      state int32
      sema  uint32
  }  

state用于锁的状态流转,是个复合变量,用bit位代表不同的含义

第0位表示是否加锁状态。

第1位表示是否存在唤醒的goroutine。

第2位表示锁当前处在什么模式:正常模式/饥饿模式。

3-31位用于表示等待goroutine的个数。

sema是信号量,用于等待goroutine的唤醒操作。

加锁

 // Lock locks m.
  // If the lock is already in use, the calling goroutine
  // blocks until the mutex is available.
  func (m *Mutex) Lock() {
      // Fast path: grab unlocked mutex.
      if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
          if race.Enabled {
              race.Acquire(unsafe.Pointer(m))
          }
          return
      }
      // Slow path (outlined so that the fast path can be inlined)
      m.lockSlow()
  }  

atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)是原子操作,用于判断锁是否为初始状态并尝试加锁,如果成功直接返回,提高加锁效率.

如果加锁失败说明锁已被占用,则调用lockSlow去做相应的操作.

 func (m *Mutex) lockSlow() {
      var waitStartTime int64
      starving := false
      awoke := false
      iter := 0
      old := m.state
      for {
          // Don't spin in starvation mode, ownership is handed off to waiters
          // so we won't be able to acquire the mutex anyway.
          // 非饥饿模式并且可以自旋
          // 饥饿模式下是直接将锁交给下一位,所以我们这里是不可以获取锁的
          if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
              // Active spinning makes sense.
              // Try to set mutexWoken flag to inform Unlock
              // to not wake other blocked goroutines.
              // 尝试将唤醒位置1,避免没有必要的唤醒.
              // 这里可以看出正常模式下对新来的goroutine比较友好.
              if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                  atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                  awoke = true
              }
              runtime_doSpin()
              iter++
              old = m.state
              continue
          }
          new := old
          // Don't try to acquire starving mutex, new arriving goroutines must queue.
          // 非饥饿模式下可以尝试获取锁
          if old&mutexStarving == 0 {
              new |= mutexLocked
          }
          // 饥饿模式下只能进等待队列了
          if old&(mutexLocked|mutexStarving) != 0 {
              new += 1 << mutexWaiterShift
          }
          // The current goroutine switches mutex to starvation mode.
          // But if the mutex is currently unlocked, don't do the switch.
          // Unlock expects that starving mutex has waiters, which will not
          // be true in this case.
          // 如果当前为饥饿模式,将锁切换为饥饿模式,如果当前是未加锁就不要切换,
          // 因为饥饿模式下必须要有等待者,在此case下不一定成立
          if starving && old&mutexLocked != 0 {
              new |= mutexStarving
          }
          if awoke {
              // The goroutine has been woken from sleep,
              // so we need to reset the flag in either case.
              // 如果当前goroutine是被唤醒的就需要设置被唤醒位
              if new&mutexWoken == 0 {
                  throw("sync: inconsistent mutex state")
              }
              new &^= mutexWoken
          }
          if atomic.CompareAndSwapInt32(&m.state, old, new) {
              if old&(mutexLocked|mutexStarving) == 0 { // 如果之前就是解锁状态切非饥饿
                  break // locked the mutex with CAS
              }
              // If we were already waiting before, queue at the front of the queue.
              queueLifo := waitStartTime != 0
              if waitStartTime == 0 {
                  waitStartTime = runtime_nanotime()
              }
              runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 阻塞排队
              starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
              old = m.state
              if old&mutexStarving != 0 {
                  // If this goroutine was woken and mutex is in starvation mode,
                  // ownership was handed off to us but mutex is in somewhat
                  // inconsistent state: mutexLocked is not set and we are still
                  // accounted as waiter. Fix that.
                  // 如果当前goroutine被唤醒且Mutex处于饥饿模式,
                  // 锁的拥有者将锁交到当前goroutine但是锁处于不一致的状态:
                  // mutexLocked还没有被设置并且我们还被算成一个等待者,需要修复改状态
                  
                  if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                      throw("sync: inconsistent mutex state")
                  }
                  delta := int32(mutexLocked - 1<<mutexWaiterShift)
                  if !starving || old>>mutexWaiterShift == 1 {
                      // Exit starvation mode.
                      // Critical to do it here and consider wait time.
                      // Starvation mode is so inefficient, that two goroutines
                      // can go lock-step infinitely once they switch mutex
                      // to starvation mode.
                      // 如果当前为非饥饿模式或者自己是等待队列里的最后一个,则调整锁模式为正常模式
                      delta -= mutexStarving
                  }
                  atomic.AddInt32(&m.state, delta)
                  break
              }
              // 非饥饿模式下重新竞争锁,没有优势
              awoke = true
              iter = 0
          } else {
              old = m.state // 重来一次
          }
      }

      if race.Enabled {
          race.Acquire(unsafe.Pointer(m))
      }
  }  

lockSlow主要做下面的操作

会首先判断当前能不能进入自旋状态:

当前互斥锁的状态是非饥饿状态,并且已经被锁定了

自旋次数不超过 4 次

cpu 个数大于一,必须要是多核 cpu

当前正在执行当中,并且队列空闲的 p 的个数大于等于一

 // Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
        if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
                return false
        }
        if p := getg().m.p.ptr(); !runqempty(p) {
                return false
        }
        return true
}  

在自旋的过程中会尝试设置 mutexWoken 来通知解锁,从而避免唤醒其他已经休眠的goroutine, 在自旋模式下,当前的goroutine就能更快的获取到锁(line: 14)。

  1. 自旋完成之后,就会去计算当前的锁的状态,如果当前处在饥饿模式下则不会去请求锁(line:25)。
  2. 状态计算完成之后就会尝试使用 CAS 操作获取锁,如果获取成功就会直接退出循环。
  3. 如果没有获取到就调用 runtime_SemacquireMutex(&m.sema, queueLifo, 1) 方法休眠当前 goroutine 并且尝试获取信号量。runtime_SemacquireMutex 会休眠当前goroutine并等待信号量唤醒goroutine
  4. Semacquire 等待直到 *s > 0 并且自动减少*s值
  5. SemacquireMutex 跟 Semacquire类似,不过是为了互斥锁特殊优化的, 如果lifo为true直接将当前goroutine放在阻塞队列首位
  6. goroutine 被唤醒之后会先判断当前是否处在饥饿状态,(如果当前 goroutine 超过 1ms 都没有获取到锁就会进饥饿模式) 1. 如果处在饥饿状态就会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出 2. 如果不在,就会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环

解锁

 // Unlock unlocks m.
  // It is a run-time error if m is not locked on entry to Unlock.
  //
  // A locked Mutex is not associated with a particular goroutine.
  // It is allowed for one goroutine to lock a Mutex and then
  // arrange for another goroutine to unlock it.
  // 解锁一个没有锁定的互斥量会报运行时错误.
  // 一个加锁的Mutex并不和特定的goroutine绑定,允许一个goroutine对mutex加锁然后安排另一个goroutine解锁。
  func (m *Mutex) Unlock() {
      if race.Enabled {
          _ = m.state
          race.Release(unsafe.Pointer(m))
      }

      // Fast path: drop lock bit.
      new := atomic.AddInt32(&m.state, -mutexLocked)
      if new != 0 { // 如果new等于零表示没有其他任何需要通知的goroutine。直接返回
          // Outlined slow path to allow inlining the fast path.
          // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
          m.unlockSlow(new)
      }
  }

  func (m *Mutex) unlockSlow(new int32) {
      // 解锁一个未加锁的Mutex会报错
      if (new+mutexLocked)&mutexLocked == 0 {
          throw("sync: unlock of unlocked mutex")
      }
      if new&mutexStarving == 0 {
          old := new
          for {
              // If there are no waiters or a goroutine has already
              // been woken or grabbed the lock, no need to wake anyone.
              // In starvation mode ownership is directly handed off from unlocking
              // goroutine to the next waiter. We are not part of this chain,
              // since we did not observe mutexStarving when we unlocked the mutex above.
              // So get off the way.
              // 如果没有等待的goroutine或者已经有被唤醒的goroutine或者已经获得锁或者已经变成饥饿模式,则直接退出
              if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                  return
              }
              // Grab the right to wake someone.
              // 将等待着减一并且设置唤醒位
              new = (old - 1<<mutexWaiterShift) | mutexWoken
              if atomic.CompareAndSwapInt32(&m.state, old, new) {
                  // 唤醒等待goroutine
                  runtime_Semrelease(&m.sema, false, 1)
                  return
              }
              old = m.state
          }
      } else {
          // Starving mode: handoff mutex ownership to the next waiter, and yield
          // our time slice so that the next waiter can start to run immediately.
          // Note: mutexLocked is not set, the waiter will set it after wakeup.
          // But mutex is still considered locked if mutexStarving is set,
          // so new coming goroutines won't acquire it.
          // 饥饿模式下直接唤醒队首的goroutine并且让出时间片使waiter可以立即运行
          // 注意:此时mutexLocked还未设置,waiter在被唤醒后会进行设置,
          // 但是在饥饿模式下Mutex会仍然被认为在加锁状态,因此新来的goroutine不会获取到锁
          runtime_Semrelease(&m.sema, true, 1)
      }
  }
  
  // Semrelease atomically increments *s and notifies a waiting goroutine
  // if one is blocked in Semacquire.
  // It is intended as a simple wakeup primitive for use by the synchronization
  // library and should not be used directly.
  // If handoff is true, pass count directly to the first waiter.
  // skipframes is the number of frames to omit during tracing, counting from
  // runtime_Semrelease's caller.
  // Semrelease 自动增加s的值并且唤醒一个阻塞的goroutine。
  // 如果handoff等于true,直接将count传递给第一个等待者
! func runtime_Semrelease(s *uint32, handoff bool, skipframes int)  

饥饿模式解锁

state 有 mutexStarving 标识、无 mutexLocked 标识,而被唤醒的 goroutine 会设置 mutexLocked 状态、将等待的 goroutine 数减 1。

注意的是,并不会马上清除 mutexStarving 标识,而是一直在饥饿模式下操作,直到遇到第一个符合条件的 goroutine 才会清除 mutexStarving 标识。 在饥饿模式期间,如果有新的 goroutine 来请求抢占锁,mutex 会被认为还处于锁状态,所以新来的goroutine 不会进行抢占锁操作。

总结

Mutex 通过正常模式和饥饿模式两种模式来平衡锁的效率和公平性。

文章来源:智云一二三科技

文章标题:Golang Mutex学习

文章地址:https://www.zhihuclub.com/97353.shtml

关于作者: 智云科技

热门文章

网站地图