您的位置 首页 golang

M的状态转换


声明

下面的分析均基于Golang1.14版本。

M的状态

M只有Running和Stop这2个状态,还有一个spinning中间态,当从Running转为Stop时,会先spinning寻找可运行的G,如果找不到则进入Stop。

M状态转换

主要流程

1.mstart,Go程序初始化时,第一个M是由mstart创建,新的物理线程创建时,调用的函数也是mstart。
2.startm,当有新的G创建或者有G从waiting进入running且还有空闲的P,此时会调用startm,获取一个M和空闲的P绑定执行G。
3.newm,当调用startm时,如果没有空闲的M则会通过newm创建M。
4.stopm,在2种情况下会执行stopm,一是当M绑定的P无可运行的G且无法从其它P窃取可运行的G时,M先进入spinning状态,然后退出。二是当M和G进入系统调用后,长时间未退出,P被retake且M找不到空闲的P绑定,此时M会调用stopm。
5.spinning状态,在findrunnable函数中,会短暂进入spinning状态,如果找不到可运行的G则调用stopm。
PS:上述主要流程解释了函数什么时候由谁触发调用,后面不再赘述。

线程的休眠和唤醒

1.M绑定了一个物理线程,M的running和stop就代表了物理线程的状态,那么物理线程是如何休眠和唤醒的呢?下面以Linux操作系统为例介绍物理线程的变化。
2.Linux线程同步通过futex系统调用实现,futex详细介绍

#include <linux/futex.h>       #include <sys/time.h>// 主要关注前3个参数,uaddr表示同步的内存地址。//futex_op表示操作类型,这里使用了FUTEX_WAIT,FUTEX_WAKE这2种类型。//val在FUTEX_WAIT时表示当uaddr指向的值等于val时休眠//val在FUTEX_WAKE时表示唤醒休眠在uaddr上的线程数量(Go中默认是1)       int futex(int *uaddr, int futex_op, int val,                 const struct timespec *timeout,   /* or: uint32_t val2 */                 int *uaddr2, int val3);

3.线程休眠
asmcgoyield是执行cgo_yield函数,具体的不深究。

func notesleep(n *note) {    gp := getg() // 线程休眠只发生在退出系统调用或者schedule函数 因此必然是g0    if gp != gp.m.g0 {        throw("notesleep not on g0")    }    ns := int64(-1) // 通常休眠后不会主动唤醒    if *cgo_yield != nil {        // Sleep for an arbitrary-but-moderate interval to poll libc interceptors.        ns = 10e6 // 在cgo情况下 每休眠10ms唤醒一次    }    for atomic.Load(key32(&n.key)) == 0 { // 当note.key==0时 休眠        gp.m.blocked = true        futexsleep(key32(&n.key), 0, ns) // 调用futexsleep进入休眠        if *cgo_yield != nil {            asmcgocall(*cgo_yield, nil) // 如果是cgo调用asmcgocall        }        gp.m.blocked = false    }}func futexsleep(addr *uint32, val uint32, ns int64) {    if ns < 0 { // 如果ns < 0 则一直休眠不主动唤醒        futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)        return    }    var ts timespec    ts.setNsec(ns) // 设置休眠时间    futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)}

4.线程唤醒

func notewakeup(n *note) {    old := atomic.Xchg(key32(&n.key), 1) // 将note.key设置为1 note.key休眠时为0    if old != 0 {        print("notewakeup - double wakeup (", old, ")\n")        throw("notewakeup - double wakeup")    }    futexwakeup(key32(&n.key), 1) // 尝试唤醒note.key}func futexwakeup(addr *uint32, cnt uint32) {    ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0)    if ret >= 0 {        return    } // 正常情况下不会执行到下面的代码    systemstack(func() {        print("futexwakeup addr=", addr, " returned ", ret, "\n")    })    *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006}

5.总结 M的休眠和唤醒都是通过m.note.key进行同步,对M的休眠和唤醒操作都是操作m.note.key所在的内存。

mstart

此时初始化的M为m0,是Go进程的第一个M。

func mstart() {    _g_ := getg()    osStack := _g_.stack.lo == 0    if osStack {        // Initialize stack bounds from system stack.        // Cgo may have left stack size in stack.hi.        // minit may update the stack bounds.        size := _g_.stack.hi        if size == 0 {            size = 8192 * sys.StackGuardMultiplier        }        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))        _g_.stack.lo = _g_.stack.hi - size + 1024    }    // Initialize stack guard so that we can start calling regular    // Go code.    _g_.stackguard0 = _g_.stack.lo + _StackGuard    // This is the g0, so we can also call go:systemstack    // functions, which check stackguard1.    _g_.stackguard1 = _g_.stackguard0    mstart1()    // Exit this thread.    switch GOOS {    case "windows", "solaris", "illumos", "plan9", "darwin", "aix":        // Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate        // the stack, but put it in _g_.stack before mstart,        // so the logic above hasn't set osStack yet.        osStack = true    }    mexit(osStack)}func mstart1() {    _g_ := getg()    if _g_ != _g_.m.g0 {        throw("bad runtime·mstart")    }    // Record the caller for use as the top of stack in mcall and    // for terminating the thread.    // We're never coming back to mstart1 after we call schedule,    // so other calls can reuse the current frame.    save(getcallerpc(), getcallersp()) // 保存pc sp到g0中 此处的pc和sp是mstart调用mstart1时的pc和sp    asminit() // 针对不同的CPU进行初始化 忽略    minit() // 主要是将阻塞的信号屏蔽 阻塞该信号    // Install signal handlers; after minit so that minit can    // prepare the thread to be able to handle the signals.    if _g_.m == &m0 {        mstartm0() // 初始化信号处理函数    }    if fn := _g_.m.mstartfn; fn != nil {        fn()    }    if _g_.m != &m0 {        acquirep(_g_.m.nextp.ptr())        _g_.m.nextp = 0    }    schedule()}func minit() {    // The alternate signal stack is buggy on arm and arm64.    // The signal handler handles it directly.    if GOARCH != "arm" && GOARCH != "arm64" {        minitSignalStack() // 信号回调栈 信号处理函数使用的栈    }    minitSignalMask() // 初始化信号处理 不深究    getg().m.procid = uint64(pthread_self())}func mstartm0() {    // Create an extra M for callbacks on threads not created by Go.    // An extra M is also needed on Windows for callbacks created by    // syscall.NewCallback. See issue #6751 for details.    if (iscgo || GOOS == "windows") && !cgoHasExtraM {        cgoHasExtraM = true        newextram()    }    initsig(false) // 注册信号处理函数 不递归深究}

startm

寻找空闲的M和P,将P绑定到M中的m.nextp,并且尝试通过m.note唤醒M。当M唤醒后,和m.nextp指定的P绑定。

func startm(_p_ *p, spinning bool) {    lock(&sched.lock) // 对sched上锁    if _p_ == nil {        _p_ = pidleget() // 获取空闲的P        if _p_ == nil {            unlock(&sched.lock)            if spinning {                // The caller incremented nmspinning, but there are no idle Ps,                // so it's okay to just undo the increment and give up.                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {                    throw("startm: negative nmspinning")                }            }            return        }    }    mp := mget() // 获取空闲的M    unlock(&sched.lock) // 释放sched中的锁    if mp == nil { // 如果没有空闲m 则新建一个空闲的m        var fn func()        if spinning {            // The caller incremented nmspinning, so set m.spinning in the new M.            fn = mspinning        }        newm(fn, _p_)        return    }        // 异常情况抛出异常    if mp.spinning {        throw("startm: m is spinning")    }    if mp.nextp != 0 {        throw("startm: m has p")    }    if spinning && !runqempty(_p_) {        throw("startm: p has runnable gs")    }    // The caller incremented nmspinning, so set m.spinning in the new M.    mp.spinning = spinning    mp.nextp.set(_p_) // 设置M的nextp 当M唤醒后会和m.nextp中的P绑定    // 通过m.note唤醒M    notewakeup(&mp.park)}

stopm

调用stopm时,P和M已经解绑,此时将M投入全局的空闲队列并且伴随物理线程一起休眠。

func stopm() {    _g_ := getg()    if _g_.m.locks != 0 {        throw("stopm holding locks")    }    if _g_.m.p != 0 {        throw("stopm holding p")    }    if _g_.m.spinning {        throw("stopm spinning")    }    lock(&sched.lock)    mput(_g_.m) // m投入全局的空闲m列表中    unlock(&sched.lock)    // 线程m将停在 notesleep 中    notesleep(&_g_.m.park)    noteclear(&_g_.m.park) // 休眠时m.note.key == 0 当m.note.key != 0时退出休眠 此时回复m.note.key = 0    acquirep(_g_.m.nextp.ptr()) // 和m.nextp进行绑定    _g_.m.nextp = 0 // m.nextp设置为0}

newm

在源码剖析前先分析newm要做什么。
1.创建M对应的结构体。
2.创建和M绑定的g0。
3.创建物理线程进入休眠,并且M和物理线程绑定。

func newm(fn func(), _p_ *p) {    mp := allocm(_p_, fn) // 根据P和fn创建M    mp.nextp.set(_p_) // 设置M的nextp    mp.sigmask = initSigmask    // 删除部分代码 go 调用C 然后调用 go? 暂不考虑    newm1(mp)}func allocm(_p_ *p, fn func()) *m {    _g_ := getg()    acquirem() // disable GC because it can be called from sysmon    if _g_.m.p == 0 { // 如果P空闲则尝试获取P        acquirep(_p_) // temporarily borrow p for mallocs in this function    }    // Release the free M list. We need to do this somewhere and    // this may free up a stack we can use.    //删掉部分代码 释放freem 不太理解为什么要在这里做    mp := new(m)    mp.mstartfn = fn    mcommoninit(mp) // m初始化 不深究    // In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack.    // Windows and Plan 9 will layout sched stack on OS stack.    // 初始化g0 注意 有cgo的情况下 g0不分配栈 而是使用物理线程的栈 为什么呢?    if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {        mp.g0 = malg(-1)    } else {        mp.g0 = malg(8192 * sys.StackGuardMultiplier)    }    mp.g0.m = mp    if _p_ == _g_.m.p.ptr() {        releasep()    }    releasem(_g_.m)    return mp}func malg(stacksize int32) *g {    newg := new(g)    if stacksize >= 0 {        // round2把数值向上调整为2的幂次        stacksize = round2(_StackSystem + stacksize)        systemstack(func() {            // 主要是栈大小的调整和栈内存的实际分配            newg.stack = stackalloc(uint32(stacksize))        })        newg.stackguard0 = newg.stack.lo + _StackGuard        newg.stackguard1 = ^uintptr(0)        // Clear the bottom word of the stack. We record g        // there on gsignal stack during VDSO on ARM and ARM64.        *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0    }    return newg}func newm1(mp *m) {    if iscgo { // 如果是cgo        // cgo情况下 会将M作为参数传入并且最终调用mstart函数 在mstart中具体分析        var ts cgothreadstart        if _cgo_thread_start == nil {            throw("_cgo_thread_start missing")        }        ts.g.set(mp.g0)        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))        ts.fn = unsafe.Pointer(funcPC(mstart)) //        if msanenabled {            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))        }        execLock.rlock() // Prevent process clone.        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))        execLock.runlock()        return    }    execLock.rlock() // Prevent process clone.    newosproc(mp)  // 创建物理线程    execLock.runlock()}// startm 部分节选func mstart() {    osStack := _g_.stack.lo == 0 // cgo时 未初始化g0的栈 使用os的栈    if osStack {        // Initialize stack bounds from system stack.        // Cgo may have left stack size in stack.hi.        // minit may update the stack bounds.        size := _g_.stack.hi        if size == 0 {            size = 8192 * sys.StackGuardMultiplier        }        // g0的栈绑定到物理线程的栈上        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))        _g_.stack.lo = _g_.stack.hi - size + 1024    }    mstart1()}func mstart1() {    if _g_.m != &m0 {        acquirep(_g_.m.nextp.ptr())        _g_.m.nextp = 0    }    schedule()}// 创建物理线程来执行mstartfunc newosproc(mp *m) {    stk := unsafe.Pointer(mp.g0.stack.hi) // 取g0的栈作为线程的栈    /*     * note: strace gets confused if we use CLONE_PTRACE here.     */    if false {        print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", funcPC(clone), " id=", mp.id, " ostk=", &mp, "\n")    }    // Disable signals during clone, so that the new thread starts    // with signals disabled. It will enable them in minit.    var oset sigset    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)    // 通过clone创建线程 g0.stack作为栈 mstart作为启动函数    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))    sigprocmask(_SIG_SETMASK, &oset, nil)    if ret < 0 {        print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")        if ret == -_EAGAIN {            println("runtime: may need to increase max user processes (ulimit -u)")        }        throw("newosproc")    }}

总结:
1.g0的栈和物理线程使用的栈是统一的。
2.cgo情况下,使用物理线程分配的栈,原因是cgo调用的C的库,C代码都是运行在物理线程上,如果不使用物理线程大小的栈,cgo代码可能在其它语言调用时是正常的,而在go中调用失败,栈溢出。
3.newm创建的M已经在执行schedule函数了,不需要再度唤醒。

findrunnable–spinning

spinning主要是
1.GC,网络,timer相关的处理。
2.尝试从全局的和其它的P中窃取可运行的G。
3.找不到可运行的G则stopm。

func findrunnable() (gp *g, inheritTime bool) {    _g_ := getg()top:    _p_ := _g_.m.p.ptr()    // local runq 本地的可运行G队列    if gp, inheritTime := runqget(_p_); gp != nil {        return gp, inheritTime    }    // global runq 全局的可运行G队列    if sched.runqsize != 0 {        lock(&sched.lock)        gp := globrunqget(_p_, 0)        unlock(&sched.lock)        if gp != nil {            return gp, false        }    }    // Steal work from other P's. 从其它P中窃取部分G    procs := uint32(gomaxprocs)    ranTimer := false// 运行中的P有一半是在spinning 则直接stop    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {        goto stop    }    if !_g_.m.spinning {        _g_.m.spinning = true        atomic.Xadd(&sched.nmspinning, 1)    }    for i := 0; i < 4; i++ { // 总共找4次 每次随机一个起始的P进行偷取        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {            if sched.gcwaiting != 0 {                goto top            }            stealRunNextG := i > 2 // first look for ready queues with more than 1 g            p2 := allp[enum.position()]            if _p_ == p2 {                continue            }            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {                return gp, false            }        }    }stop:    // wasm only:    // If a callback returned and no other goroutine is awake,    // then pause execution until a callback was triggered.    if beforeIdle(delta) { // 进入stop前回调        // At least one goroutine got woken.        goto top    }    // 再次检查全局的G运行队列    if sched.runqsize != 0 {        gp := globrunqget(_p_, 0)        unlock(&sched.lock)        return gp, false    }    if releasep() != _p_ {        throw("findrunnable: wrong p")    }    pidleput(_p_)    unlock(&sched.lock)    wasSpinning := _g_.m.spinning    if _g_.m.spinning {        _g_.m.spinning = false        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {            throw("findrunnable: negative nmspinning")        }    }    // check all runqueues once again 再次尝试从其它P中偷取    for _, _p_ := range allpSnapshot {        if !runqempty(_p_) {            lock(&sched.lock)            _p_ = pidleget()            unlock(&sched.lock)            if _p_ != nil {                acquirep(_p_)                if wasSpinning {                    _g_.m.spinning = true                    atomic.Xadd(&sched.nmspinning, 1)                }                goto top            }            break        }    }    stopm()    goto top}

总结

1.M包含物理线程运行所需要的数据,P包含调度G所需要的数据。


文章来源:智云一二三科技

文章标题:M的状态转换

文章地址:https://www.zhihuclub.com/242.shtml

关于作者: 智云科技

热门文章

网站地图