声明
下面的分析均基于Golang1.14版本。
M的状态
M只有Running和Stop这2个状态,还有一个spinning中间态,当从Running转为Stop时,会先spinning寻找可运行的G,如果找不到则进入Stop。
主要流程
1.mstart,Go程序初始化时,第一个M是由mstart创建,新的物理线程创建时,调用的函数也是mstart。
2.startm,当有新的G创建或者有G从waiting进入running且还有空闲的P,此时会调用startm,获取一个M和空闲的P绑定执行G。
3.newm,当调用startm时,如果没有空闲的M则会通过newm创建M。
4.stopm,在2种情况下会执行stopm,一是当M绑定的P无可运行的G且无法从其它P窃取可运行的G时,M先进入spinning状态,然后退出。二是当M和G进入系统调用后,长时间未退出,P被retake且M找不到空闲的P绑定,此时M会调用stopm。
5.spinning状态,在findrunnable函数中,会短暂进入spinning状态,如果找不到可运行的G则调用stopm。
PS:上述主要流程解释了函数什么时候由谁触发调用,后面不再赘述。
线程的休眠和唤醒
1.M绑定了一个物理线程,M的running和stop就代表了物理线程的状态,那么物理线程是如何休眠和唤醒的呢?下面以Linux操作系统为例介绍物理线程的变化。
2.Linux线程同步通过futex系统调用实现,futex详细介绍。
#include <linux/futex.h> #include <sys/time.h>// 主要关注前3个参数,uaddr表示同步的内存地址。//futex_op表示操作类型,这里使用了FUTEX_WAIT,FUTEX_WAKE这2种类型。//val在FUTEX_WAIT时表示当uaddr指向的值等于val时休眠//val在FUTEX_WAKE时表示唤醒休眠在uaddr上的线程数量(Go中默认是1) int futex(int *uaddr, int futex_op, int val, const struct timespec *timeout, /* or: uint32_t val2 */ int *uaddr2, int val3);
3.线程休眠
asmcgoyield是执行cgo_yield函数,具体的不深究。
func notesleep(n *note) { gp := getg() // 线程休眠只发生在退出系统调用或者schedule函数 因此必然是g0 if gp != gp.m.g0 { throw("notesleep not on g0") } ns := int64(-1) // 通常休眠后不会主动唤醒 if *cgo_yield != nil { // Sleep for an arbitrary-but-moderate interval to poll libc interceptors. ns = 10e6 // 在cgo情况下 每休眠10ms唤醒一次 } for atomic.Load(key32(&n.key)) == 0 { // 当note.key==0时 休眠 gp.m.blocked = true futexsleep(key32(&n.key), 0, ns) // 调用futexsleep进入休眠 if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) // 如果是cgo调用asmcgocall } gp.m.blocked = false }}func futexsleep(addr *uint32, val uint32, ns int64) { if ns < 0 { // 如果ns < 0 则一直休眠不主动唤醒 futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0) return } var ts timespec ts.setNsec(ns) // 设置休眠时间 futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)}
4.线程唤醒
func notewakeup(n *note) { old := atomic.Xchg(key32(&n.key), 1) // 将note.key设置为1 note.key休眠时为0 if old != 0 { print("notewakeup - double wakeup (", old, ")\n") throw("notewakeup - double wakeup") } futexwakeup(key32(&n.key), 1) // 尝试唤醒note.key}func futexwakeup(addr *uint32, cnt uint32) { ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0) if ret >= 0 { return } // 正常情况下不会执行到下面的代码 systemstack(func() { print("futexwakeup addr=", addr, " returned ", ret, "\n") }) *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006}
5.总结 M的休眠和唤醒都是通过m.note.key进行同步,对M的休眠和唤醒操作都是操作m.note.key所在的内存。
mstart
此时初始化的M为m0,是Go进程的第一个M。
func mstart() { _g_ := getg() osStack := _g_.stack.lo == 0 if osStack { // Initialize stack bounds from system stack. // Cgo may have left stack size in stack.hi. // minit may update the stack bounds. size := _g_.stack.hi if size == 0 { size = 8192 * sys.StackGuardMultiplier } _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) _g_.stack.lo = _g_.stack.hi - size + 1024 } // Initialize stack guard so that we can start calling regular // Go code. _g_.stackguard0 = _g_.stack.lo + _StackGuard // This is the g0, so we can also call go:systemstack // functions, which check stackguard1. _g_.stackguard1 = _g_.stackguard0 mstart1() // Exit this thread. switch GOOS { case "windows", "solaris", "illumos", "plan9", "darwin", "aix": // Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate // the stack, but put it in _g_.stack before mstart, // so the logic above hasn't set osStack yet. osStack = true } mexit(osStack)}func mstart1() { _g_ := getg() if _g_ != _g_.m.g0 { throw("bad runtime·mstart") } // Record the caller for use as the top of stack in mcall and // for terminating the thread. // We're never coming back to mstart1 after we call schedule, // so other calls can reuse the current frame. save(getcallerpc(), getcallersp()) // 保存pc sp到g0中 此处的pc和sp是mstart调用mstart1时的pc和sp asminit() // 针对不同的CPU进行初始化 忽略 minit() // 主要是将阻塞的信号屏蔽 阻塞该信号 // Install signal handlers; after minit so that minit can // prepare the thread to be able to handle the signals. if _g_.m == &m0 { mstartm0() // 初始化信号处理函数 } if fn := _g_.m.mstartfn; fn != nil { fn() } if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } schedule()}func minit() { // The alternate signal stack is buggy on arm and arm64. // The signal handler handles it directly. if GOARCH != "arm" && GOARCH != "arm64" { minitSignalStack() // 信号回调栈 信号处理函数使用的栈 } minitSignalMask() // 初始化信号处理 不深究 getg().m.procid = uint64(pthread_self())}func mstartm0() { // Create an extra M for callbacks on threads not created by Go. // An extra M is also needed on Windows for callbacks created by // syscall.NewCallback. See issue #6751 for details. if (iscgo || GOOS == "windows") && !cgoHasExtraM { cgoHasExtraM = true newextram() } initsig(false) // 注册信号处理函数 不递归深究}
startm
寻找空闲的M和P,将P绑定到M中的m.nextp,并且尝试通过m.note唤醒M。当M唤醒后,和m.nextp指定的P绑定。
func startm(_p_ *p, spinning bool) { lock(&sched.lock) // 对sched上锁 if _p_ == nil { _p_ = pidleget() // 获取空闲的P if _p_ == nil { unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } mp := mget() // 获取空闲的M unlock(&sched.lock) // 释放sched中的锁 if mp == nil { // 如果没有空闲m 则新建一个空闲的m var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) return } // 异常情况抛出异常 if mp.spinning { throw("startm: m is spinning") } if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning mp.nextp.set(_p_) // 设置M的nextp 当M唤醒后会和m.nextp中的P绑定 // 通过m.note唤醒M notewakeup(&mp.park)}
stopm
调用stopm时,P和M已经解绑,此时将M投入全局的空闲队列并且伴随物理线程一起休眠。
func stopm() { _g_ := getg() if _g_.m.locks != 0 { throw("stopm holding locks") } if _g_.m.p != 0 { throw("stopm holding p") } if _g_.m.spinning { throw("stopm spinning") } lock(&sched.lock) mput(_g_.m) // m投入全局的空闲m列表中 unlock(&sched.lock) // 线程m将停在 notesleep 中 notesleep(&_g_.m.park) noteclear(&_g_.m.park) // 休眠时m.note.key == 0 当m.note.key != 0时退出休眠 此时回复m.note.key = 0 acquirep(_g_.m.nextp.ptr()) // 和m.nextp进行绑定 _g_.m.nextp = 0 // m.nextp设置为0}
newm
在源码剖析前先分析newm要做什么。
1.创建M对应的结构体。
2.创建和M绑定的g0。
3.创建物理线程进入休眠,并且M和物理线程绑定。
func newm(fn func(), _p_ *p) { mp := allocm(_p_, fn) // 根据P和fn创建M mp.nextp.set(_p_) // 设置M的nextp mp.sigmask = initSigmask // 删除部分代码 go 调用C 然后调用 go? 暂不考虑 newm1(mp)}func allocm(_p_ *p, fn func()) *m { _g_ := getg() acquirem() // disable GC because it can be called from sysmon if _g_.m.p == 0 { // 如果P空闲则尝试获取P acquirep(_p_) // temporarily borrow p for mallocs in this function } // Release the free M list. We need to do this somewhere and // this may free up a stack we can use. //删掉部分代码 释放freem 不太理解为什么要在这里做 mp := new(m) mp.mstartfn = fn mcommoninit(mp) // m初始化 不深究 // In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack. // Windows and Plan 9 will layout sched stack on OS stack. // 初始化g0 注意 有cgo的情况下 g0不分配栈 而是使用物理线程的栈 为什么呢? if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" { mp.g0 = malg(-1) } else { mp.g0 = malg(8192 * sys.StackGuardMultiplier) } mp.g0.m = mp if _p_ == _g_.m.p.ptr() { releasep() } releasem(_g_.m) return mp}func malg(stacksize int32) *g { newg := new(g) if stacksize >= 0 { // round2把数值向上调整为2的幂次 stacksize = round2(_StackSystem + stacksize) systemstack(func() { // 主要是栈大小的调整和栈内存的实际分配 newg.stack = stackalloc(uint32(stacksize)) }) newg.stackguard0 = newg.stack.lo + _StackGuard newg.stackguard1 = ^uintptr(0) // Clear the bottom word of the stack. We record g // there on gsignal stack during VDSO on ARM and ARM64. *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0 } return newg}func newm1(mp *m) { if iscgo { // 如果是cgo // cgo情况下 会将M作为参数传入并且最终调用mstart函数 在mstart中具体分析 var ts cgothreadstart if _cgo_thread_start == nil { throw("_cgo_thread_start missing") } ts.g.set(mp.g0) ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0])) ts.fn = unsafe.Pointer(funcPC(mstart)) // if msanenabled { msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts)) } execLock.rlock() // Prevent process clone. asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts)) execLock.runlock() return } execLock.rlock() // Prevent process clone. newosproc(mp) // 创建物理线程 execLock.runlock()}// startm 部分节选func mstart() { osStack := _g_.stack.lo == 0 // cgo时 未初始化g0的栈 使用os的栈 if osStack { // Initialize stack bounds from system stack. // Cgo may have left stack size in stack.hi. // minit may update the stack bounds. size := _g_.stack.hi if size == 0 { size = 8192 * sys.StackGuardMultiplier } // g0的栈绑定到物理线程的栈上 _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) _g_.stack.lo = _g_.stack.hi - size + 1024 } mstart1()}func mstart1() { if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } schedule()}// 创建物理线程来执行mstartfunc newosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) // 取g0的栈作为线程的栈 /* * note: strace gets confused if we use CLONE_PTRACE here. */ if false { print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", funcPC(clone), " id=", mp.id, " ostk=", &mp, "\n") } // Disable signals during clone, so that the new thread starts // with signals disabled. It will enable them in minit. var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) // 通过clone创建线程 g0.stack作为栈 mstart作为启动函数 ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) sigprocmask(_SIG_SETMASK, &oset, nil) if ret < 0 { print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n") if ret == -_EAGAIN { println("runtime: may need to increase max user processes (ulimit -u)") } throw("newosproc") }}
总结:
1.g0的栈和物理线程使用的栈是统一的。
2.cgo情况下,使用物理线程分配的栈,原因是cgo调用的C的库,C代码都是运行在物理线程上,如果不使用物理线程大小的栈,cgo代码可能在其它语言调用时是正常的,而在go中调用失败,栈溢出。
3.newm创建的M已经在执行schedule函数了,不需要再度唤醒。
findrunnable–spinning
spinning主要是
1.GC,网络,timer相关的处理。
2.尝试从全局的和其它的P中窃取可运行的G。
3.找不到可运行的G则stopm。
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg()top: _p_ := _g_.m.p.ptr() // local runq 本地的可运行G队列 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq 全局的可运行G队列 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // Steal work from other P's. 从其它P中窃取部分G procs := uint32(gomaxprocs) ranTimer := false// 运行中的P有一半是在spinning 则直接stop if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } for i := 0; i < 4; i++ { // 总共找4次 每次随机一个起始的P进行偷取 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g p2 := allp[enum.position()] if _p_ == p2 { continue } if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } } }stop: // wasm only: // If a callback returned and no other goroutine is awake, // then pause execution until a callback was triggered. if beforeIdle(delta) { // 进入stop前回调 // At least one goroutine got woken. goto top } // 再次检查全局的G运行队列 if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again 再次尝试从其它P中偷取 for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } stopm() goto top}
总结
1.M包含物理线程运行所需要的数据,P包含调度G所需要的数据。
文章来源:智云一二三科技
文章标题:M的状态转换
文章地址:https://www.zhihuclub.com/242.shtml