Go 互斥锁Mutex

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态 。Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁 。互斥锁的作用是保证共享资源同一时刻只能被一个 Goroutine 占用,一个 Goroutine 占用了 , 其他的 Goroutine 则阻塞等待 。
1、数据结构type Mutex struct {state int32// 表示当前互斥锁的状态semauint32// 信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒}基于该数据结构 , 实现了两种方法,加锁、释放锁
type Locker interface {Lock()Unlock()}

Go 互斥锁Mutex

文章插图
const (mutexLocked = 1 << iota // 表示锁是否可用(0可用,1被别的goroutine占用),001mutexWoken// 表示mutex是否被唤醒,010mutexStarving// 当前的互斥锁进入饥饿状态,100mutexWaiterShift = iota // 表示统计阻塞在该mutex上的goroutine数目需要移位的数值,1<<(32-3)个)// sema + 1,挂起 goroutine// 1.不断调用尝试获取锁// 2.休眠当前 goroutine// 3.等待信号量,唤醒 goroutineruntime_SemacquireMutex(&m.sema, queueLifo, 1)// sema - 1,唤醒 sema 上等待的一个 goroutineruntime_Semrelease(&m.sema, false, 1)2、模式
Go 互斥锁Mutex

文章插图
2.1、正常模式在正常模式下,等待的 goroutine 会按照先进先出的顺序得到锁 。刚被唤醒的 goroutine 与新创建的 goroutine 竞争时,大概率无法获得锁,如 G1和 G2 竞争 , 此时 G1 已经占着 CPU 了,所以大概率拿到锁 。
如果 goroutine 超过 1ms,没有获取锁,就会将当前锁切换为饥饿模式 。
2.2、饥饿模式【Go 互斥锁Mutex】避免 goroutine 被饿死,1.19 引入了饥饿模式
在饥饿模式下,互斥锁会直接交给等到队列最前面的 goroutine,新的 goroutine 在该状态下不能获取锁 , 也不能进入自旋,只能在队列末尾等待 。
2.3、状态切换正常模式下,
如果队列中只剩一个goroutine 获得了互斥锁或者它等待的时间少于 1ms , 那么就会切换到正常模式 。
3、加锁
Go 互斥锁Mutex

文章插图
1、Fast path// 如果锁没被占用,也不是饥饿状态,也没有唤醒goroutine , 也没有等待goroutine,加锁成功if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}加锁的时候先通过一次 CAS(Compare And Swap) 看能不能拿到锁,如果拿到,直接返回 。
// 先判断参数addr指向的被操作值与参数old的值是否相等// 如果相等,会用参数new代表的新值替换掉原先的旧值,否则 falsefunc CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)2、Slow path如果状态不是 0 ,就会尝试通过自旋等方式等待锁释放,大致分为:
  1. 判断当前 goroutine 能否进入自旋
  2. 通过自旋等待互斥锁的释放
  3. 计算互斥锁的最新状态
  4. 更新互斥锁的状态并获取锁
// 等待时间var waitStartTime int64// 饥饿标记starving := false// 唤醒标记awoke := false// 自旋次数iter := 0// 当前的锁的状态old := m.statefor {// 步骤一// 如果锁是正常状态,锁还没被释放,就自旋// 因为饥饿模式下,需要保证等到队列中的 goroutine 能够获得锁的的所有权,防止等待队列饿死// 如果锁在饥饿模式或已经解锁,或不符合自旋条件就结束自旋if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// 如果等待队列有 goroutine ,锁没有设置唤醒状态,就设置为唤醒// 用来,当锁解锁时,不会去唤醒已经阻塞的 goroutine,保证自己更大概率拿到锁if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}// 自旋runtime_doSpin()// 自旋次数加1iter++// 设置当前锁的状态old = m.statecontinue}------------------------------------------------------------------------------>// 步骤二// 此时可能锁变为饥饿状态或者已经解锁了,或者不符合自旋条件// 获取锁最新状态new := old// 如果当前是正常模式 , 尝试加锁 。// 饥饿状态下要让出竞争权利,不能加锁if old&mutexStarving == 0 {new |= mutexLocked}// 如果当前被锁定或者处于饥饿模式,把自己放到等待队列,waiter加一,表示等待一个等待计数// 这块的状态,goroutine 只能等着,饥饿状态要让出竞争权利if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// 如果已经是饥饿状态,starving为真,并且old 的锁是占用情况 , 更新状态改为饥饿状态if starving && old&mutexLocked != 0 {new |= mutexStarving}// 如果awoke在上面自旋时设置成功 , 那么在这要消除标志位// 因为该 goroutine 要么获得了锁 , 要么进入休眠,和唤醒状态没啥关系// 后续流程会导致当前线程被挂起,需要等待其他释放锁的 goroutine 唤醒 , // 如果 unlock 是发现mutexWoken不是 0,就不会去唤醒if awoke {if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}// 清除唤醒标志位new &^= mutexWoken}------------------------------------------------------------------------------>// 步骤三if atomic.CompareAndSwapInt32(&m.state, old, new) {// 1.如果原来状态没有上锁,也没有饥饿,那么直接返回 , 表示获取到锁if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// 2.到这里是没有获取到锁,判断一下等待时长是否不为0// 如果新的 goroutine 来抢占锁,会返回 false// 如果不是新的,那么加入到队列头部// 保证等待最久的 goroutine 优先拿到锁queueLifo := waitStartTime != 0// 3.如果等待时间为0,那么初始化等待时间if waitStartTime == 0 {waitStartTime = runtime_nanotime()}// 如果不等于,说明不是第一次来,是被唤醒后过来的,则加入队列头部,queueLifo=true// 4.阻塞等待 , sema+1,并挂起 goroutine,// 如果后面 goroutine 被唤醒,就从该位置往下执行runtime_SemacquireMutex(&m.sema, queueLifo, 1)// 5.说明该 goroutine 被唤醒// 判断该 goroutine 是否长时间没有获得锁 , 如果是,就是饥饿的 goroutinestarving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs// 被挂起的时间有点长 , 需要重新获取一下当前锁的状态old = m.state// 6.判断是否已经处于饥饿状态,处于,直接获得锁,如果不处于直接跳出// 饥饿状态下,被唤醒的协程直接获得锁 。if old&mutexStarving != 0 {// 饥饿状态下,被唤醒,发现锁没释放,唤醒值是 1,等待列表没有 , 报错if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}delta := int32(mutexLocked - 1<<mutexWaiterShift)// 7.如果唤醒等待队列的 goroutine 不饥饿,或是等待队列中的最后一个 goroutineif !starving || old>>mutexWaiterShift == 1 {// 就从饥饿模式切换会正常模式delta -= mutexStarving}// 9.设置状态// 将锁状态设置为等待数量减1,同时设置为锁定,加锁成功atomic.AddInt32(&m.state, delta)break}// 当前 goroutine 是被系统唤醒的awoke = true// 重置自旋次数iter = 0} else {// 如果 CAS 失败 , 重新开始old = m.state}}if race.Enabled {race.Acquire(unsafe.Pointer(m))}

推荐阅读