M的状态转换

声明

下面的分析均基于Golang1.14版本。

M的状态

M只有Running和Stop这2个状态,还有一个spinning中间态,当从Running转为Stop时,会先spinning寻找可运行的G,如果找不到则进入Stop。

M状态转换

主要流程

1.mstart,Go程序初始化时,第一个M是由mstart创建,新的物理线程创建时,调用的函数也是mstart。
2.startm,当有新的G创建或者有G从waiting进入running且还有空闲的P,此时会调用startm,获取一个M和空闲的P绑定执行G。
3.newm,当调用startm时,如果没有空闲的M则会通过newm创建M。
4.stopm,在2种情况下会执行stopm,一是当M绑定的P无可运行的G且无法从其它P窃取可运行的G时,M先进入spinning状态,然后退出。二是当M和G进入系统调用后,长时间未退出,P被retake且M找不到空闲的P绑定,此时M会调用stopm。
5.spinning状态,在findrunnable函数中,会短暂进入spinning状态,如果找不到可运行的G则调用stopm。
PS:上述主要流程解释了函数什么时候由谁触发调用,后面不再赘述。

线程的休眠和唤醒

1.M绑定了一个物理线程,M的running和stop就代表了物理线程的状态,那么物理线程是如何休眠和唤醒的呢?下面以Linux操作系统为例介绍物理线程的变化。
2.Linux线程同步通过futex系统调用实现,futex详细介绍

#include <linux/futex.h>
       #include <sys/time.h>

// 主要关注前3个参数,uaddr表示同步的内存地址。
//futex_op表示操作类型,这里使用了FUTEX_WAIT,FUTEX_WAKE这2种类型。
//val在FUTEX_WAIT时表示当uaddr指向的值等于val时休眠
//val在FUTEX_WAKE时表示唤醒休眠在uaddr上的线程数量(Go中默认是1)
       int futex(int *uaddr, int futex_op, int val,
                 const struct timespec *timeout,   /* or: uint32_t val2 */
                 int *uaddr2, int val3);

3.线程休眠
asmcgoyield是执行cgo_yield函数,具体的不深究。

func notesleep(n *note) {
    gp := getg() // 线程休眠只发生在退出系统调用或者schedule函数 因此必然是g0
    if gp != gp.m.g0 {
        throw("notesleep not on g0")
    }
    ns := int64(-1) // 通常休眠后不会主动唤醒
    if *cgo_yield != nil {
        // Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
        ns = 10e6 // 在cgo情况下 每休眠10ms唤醒一次
    }
    for atomic.Load(key32(&n.key)) == 0 { // 当note.key==0时 休眠
        gp.m.blocked = true
        futexsleep(key32(&n.key), 0, ns) // 调用futexsleep进入休眠
        if *cgo_yield != nil {
            asmcgocall(*cgo_yield, nil) // 如果是cgo调用asmcgocall
        }
        gp.m.blocked = false
    }
}

func futexsleep(addr *uint32, val uint32, ns int64) {
    if ns < 0 { // 如果ns < 0 则一直休眠不主动唤醒
        futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
        return
    }

    var ts timespec
    ts.setNsec(ns) // 设置休眠时间
    futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

4.线程唤醒

func notewakeup(n *note) {
    old := atomic.Xchg(key32(&n.key), 1) // 将note.key设置为1 note.key休眠时为0
    if old != 0 {
        print("notewakeup - double wakeup (", old, ")\n")
        throw("notewakeup - double wakeup")
    }
    futexwakeup(key32(&n.key), 1) // 尝试唤醒note.key
}

func futexwakeup(addr *uint32, cnt uint32) {
    ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)
    if ret >= 0 {
        return
    }
 // 正常情况下不会执行到下面的代码
    systemstack(func() {
        print("futexwakeup addr=", addr, " returned ", ret, "\n")
    })

    *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006
}

5.总结 M的休眠和唤醒都是通过m.note.key进行同步,对M的休眠和唤醒操作都是操作m.note.key所在的内存。

mstart

此时初始化的M为m0,是Go进程的第一个M。

func mstart() {
    _g_ := getg()

    osStack := _g_.stack.lo == 0
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    // Initialize stack guard so that we can start calling regular
    // Go code.
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    // This is the g0, so we can also call go:systemstack
    // functions, which check stackguard1.
    _g_.stackguard1 = _g_.stackguard0
    mstart1()

    // Exit this thread.
    switch GOOS {
    case "windows", "solaris", "illumos", "plan9", "darwin", "aix":
        // Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
        // the stack, but put it in _g_.stack before mstart,
        // so the logic above hasn't set osStack yet.
        osStack = true
    }
    mexit(osStack)
}

func mstart1() {
    _g_ := getg()

    if _g_ != _g_.m.g0 {
        throw("bad runtime·mstart")
    }

    // Record the caller for use as the top of stack in mcall and
    // for terminating the thread.
    // We're never coming back to mstart1 after we call schedule,
    // so other calls can reuse the current frame.
    save(getcallerpc(), getcallersp()) // 保存pc sp到g0中 此处的pc和sp是mstart调用mstart1时的pc和sp
    asminit() // 针对不同的CPU进行初始化 忽略
    minit() // 主要是将阻塞的信号屏蔽 阻塞该信号

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    if _g_.m == &m0 {
        mstartm0() // 初始化信号处理函数
    }

    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }

    if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()
}

func minit() {
    // The alternate signal stack is buggy on arm and arm64.
    // The signal handler handles it directly.
    if GOARCH != "arm" && GOARCH != "arm64" {
        minitSignalStack() // 信号回调栈 信号处理函数使用的栈
    }
    minitSignalMask() // 初始化信号处理 不深究
    getg().m.procid = uint64(pthread_self())
}

func mstartm0() {
    // Create an extra M for callbacks on threads not created by Go.
    // An extra M is also needed on Windows for callbacks created by
    // syscall.NewCallback. See issue #6751 for details.
    if (iscgo || GOOS == "windows") && !cgoHasExtraM {
        cgoHasExtraM = true
        newextram()
    }
    initsig(false) // 注册信号处理函数 不递归深究
}

startm

寻找空闲的M和P,将P绑定到M中的m.nextp,并且尝试通过m.note唤醒M。当M唤醒后,和m.nextp指定的P绑定。

func startm(_p_ *p, spinning bool) {
    lock(&sched.lock) // 对sched上锁
    if _p_ == nil {
        _p_ = pidleget() // 获取空闲的P
        if _p_ == nil {
            unlock(&sched.lock)
            if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            return
        }
    }
    mp := mget() // 获取空闲的M
    unlock(&sched.lock) // 释放sched中的锁
    if mp == nil { // 如果没有空闲m 则新建一个空闲的m
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        newm(fn, _p_)
        return
    }
    
    // 异常情况抛出异常
    if mp.spinning {
        throw("startm: m is spinning")
    }
    if mp.nextp != 0 {
        throw("startm: m has p")
    }
    if spinning && !runqempty(_p_) {
        throw("startm: p has runnable gs")
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    mp.spinning = spinning
    mp.nextp.set(_p_) // 设置M的nextp 当M唤醒后会和m.nextp中的P绑定
    // 通过m.note唤醒M
    notewakeup(&mp.park)
}

stopm

调用stopm时,P和M已经解绑,此时将M投入全局的空闲队列并且伴随物理线程一起休眠。

func stopm() {
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("stopm holding locks")
    }
    if _g_.m.p != 0 {
        throw("stopm holding p")
    }
    if _g_.m.spinning {
        throw("stopm spinning")
    }

    lock(&sched.lock)
    mput(_g_.m) // m投入全局的空闲m列表中
    unlock(&sched.lock)
    // 线程m将停在 notesleep 中
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park) // 休眠时m.note.key == 0 当m.note.key != 0时退出休眠 此时回复m.note.key = 0
    acquirep(_g_.m.nextp.ptr()) // 和m.nextp进行绑定
    _g_.m.nextp = 0 // m.nextp设置为0
}

newm

在源码剖析前先分析newm要做什么。
1.创建M对应的结构体。
2.创建和M绑定的g0。
3.创建物理线程进入休眠,并且M和物理线程绑定。

func newm(fn func(), _p_ *p) {
    mp := allocm(_p_, fn) // 根据P和fn创建M
    mp.nextp.set(_p_) // 设置M的nextp
    mp.sigmask = initSigmask
    // 删除部分代码 go 调用C 然后调用 go? 暂不考虑
    newm1(mp)
}

func allocm(_p_ *p, fn func()) *m {
    _g_ := getg()
    acquirem() // disable GC because it can be called from sysmon
    if _g_.m.p == 0 { // 如果P空闲则尝试获取P
        acquirep(_p_) // temporarily borrow p for mallocs in this function
    }

    // Release the free M list. We need to do this somewhere and
    // this may free up a stack we can use.
    //删掉部分代码 释放freem 不太理解为什么要在这里做

    mp := new(m)
    mp.mstartfn = fn
    mcommoninit(mp) // m初始化 不深究

    // In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    // 初始化g0 注意 有cgo的情况下 g0不分配栈 而是使用物理线程的栈 为什么呢?
    if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
        mp.g0 = malg(-1)
    } else {
        mp.g0 = malg(8192 * sys.StackGuardMultiplier)
    }
    mp.g0.m = mp

    if _p_ == _g_.m.p.ptr() {
        releasep()
    }
    releasem(_g_.m)

    return mp
}

func malg(stacksize int32) *g {
    newg := new(g)
    if stacksize >= 0 {
        // round2把数值向上调整为2的幂次
        stacksize = round2(_StackSystem + stacksize)
        systemstack(func() {
            // 主要是栈大小的调整和栈内存的实际分配
            newg.stack = stackalloc(uint32(stacksize))
        })
        newg.stackguard0 = newg.stack.lo + _StackGuard
        newg.stackguard1 = ^uintptr(0)
        // Clear the bottom word of the stack. We record g
        // there on gsignal stack during VDSO on ARM and ARM64.
        *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
    }
    return newg
}

func newm1(mp *m) {
    if iscgo { // 如果是cgo
        // cgo情况下 会将M作为参数传入并且最终调用mstart函数 在mstart中具体分析
        var ts cgothreadstart
        if _cgo_thread_start == nil {
            throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(funcPC(mstart)) //
        if msanenabled {
            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock() // Prevent process clone.
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock() // Prevent process clone.
    newosproc(mp)  // 创建物理线程
    execLock.runlock()
}

// startm 部分节选
func mstart() {
    osStack := _g_.stack.lo == 0 // cgo时 未初始化g0的栈 使用os的栈
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        // g0的栈绑定到物理线程的栈上
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    mstart1()
}
func mstart1() {
    if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()
}

// 创建物理线程来执行mstart
func newosproc(mp *m) {
    stk := unsafe.Pointer(mp.g0.stack.hi) // 取g0的栈作为线程的栈
    /*
     * note: strace gets confused if we use CLONE_PTRACE here.
     */
    if false {
        print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", funcPC(clone), " id=", mp.id, " ostk=", &mp, "\n")
    }

    // Disable signals during clone, so that the new thread starts
    // with signals disabled. It will enable them in minit.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    // 通过clone创建线程 g0.stack作为栈 mstart作为启动函数
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
    sigprocmask(_SIG_SETMASK, &oset, nil)

    if ret < 0 {
        print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
        if ret == -_EAGAIN {
            println("runtime: may need to increase max user processes (ulimit -u)")
        }
        throw("newosproc")
    }
}

总结:
1.g0的栈和物理线程使用的栈是统一的。
2.cgo情况下,使用物理线程分配的栈,原因是cgo调用的C的库,C代码都是运行在物理线程上,如果不使用物理线程大小的栈,cgo代码可能在其它语言调用时是正常的,而在go中调用失败,栈溢出。
3.newm创建的M已经在执行schedule函数了,不需要再度唤醒。

findrunnable–spinning

spinning主要是
1.GC,网络,timer相关的处理。
2.尝试从全局的和其它的P中窃取可运行的G。
3.找不到可运行的G则stopm。

func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()
top:
    _p_ := _g_.m.p.ptr()
    // local runq 本地的可运行G队列
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq 全局的可运行G队列
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    // Steal work from other P's. 从其它P中窃取部分G
    procs := uint32(gomaxprocs)
    ranTimer := false
// 运行中的P有一半是在spinning 则直接stop
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    for i := 0; i < 4; i++ { // 总共找4次 每次随机一个起始的P进行偷取
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            p2 := allp[enum.position()]
            if _p_ == p2 {
                continue
            }
            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
stop:
    // wasm only:
    // If a callback returned and no other goroutine is awake,
    // then pause execution until a callback was triggered.
    if beforeIdle(delta) { // 进入stop前回调
        // At least one goroutine got woken.
        goto top
    }

    // 再次检查全局的G运行队列
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    pidleput(_p_)
    unlock(&sched.lock)

    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again 再次尝试从其它P中偷取
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }
    stopm()
    goto top
}

总结

1.M包含物理线程运行所需要的数据,P包含调度G所需要的数据。


发表评论

您的电子邮箱地址不会被公开。