监控线程是在runtime.main执行的时候在系统栈中创建的,监控线程与普通的工作线程区别在于,监控线程不需要绑定p来运行。
监控线程的创建与启动
简单的调用图
先给出个简单的调用图,好心里有数,逐个分析完后做个小结。
主体代码
以下会合并小篇幅且易懂的代码段,个人认为重点的会单独摘出来。
main->newm->newm1->newosproc
func main() { ...... if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon systemstack(func() { newm(sysmon, nil) }) } ......}func newm(fn func(), _p_ *p) { mp := allocm(_p_, fn) // 分配一个m mp.nextp.set(_p_) mp.sigmask = initSigmask ...... newm1(mp)}func newm1(mp *m) { ...... execLock.rlock() // Prevent process clone. newosproc(mp) execLock.runlock()}cloneFlags = _CLONE_VM | /* share memory */ _CLONE_FS | /* share cwd, etc */ _CLONE_FILES | /* share fd table */ _CLONE_SIGHAND | /* share sig handler table */ _CLONE_SYSVSEM | /* share SysV semaphore undo lists (see issue #20763) */ _CLONE_THREAD /* revisit - okay for now */func newosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) ...... sigprocmask(_SIG_SETMASK, &sigset_all, &oset) // 这里注意一下,mstart会被作为工作线程的开始,在runtime.clone中会被调用。 ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) sigprocmask(_SIG_SETMASK, &oset, nil) ......}
allocm
在此场景中其工作是new一个m,m.mstartfn = sysmon。
分配一个g与mp相互绑定,这个g就是mp的g0。但不是全局变量的那个g0,全局变量g0是m0的m.g0。
func allocm(_p_ *p, fn func()) *m { _g_ := getg() acquirem() // disable GC because it can be called from sysmon // 忽略sysmon不会执行的代码 mp := new(m) // 新建一个m mp.mstartfn = fn // fn 指向 sysmon mcommoninit(mp) // 之前的文章有分析过,做一些初始化工作。 if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" { mp.g0 = malg(-1) } else { mp.g0 = malg(8192 * sys.StackGuardMultiplier) // 分配一个g } mp.g0.m = mp ...... releasem(_g_.m) return mp}
runtime.clone
调用clone,内核会创建出一个子线程,返回两次。返回0是子线程,否则是父线程。
效果与fork类似,其实是fork封装了clone。
// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));TEXT runtime·clone(SB),NOSPLIT,$0 // 准备clone系统调用的参数 MOVL flags+0(FP), DI MOVQ stk+8(FP), SI MOVQ $0, DX MOVQ $0, R10 // 从父进程栈复制mp, gp, fn。子线程会用到。 MOVQ mp+16(FP), R8 MOVQ gp+24(FP), R9 MOVQ fn+32(FP), R12 // 调用clone MOVL $SYS_clone, AX SYSCALL // 父线程,返回. CMPQ AX, $0 JEQ 3(PC) MOVL AX, ret+40(FP) RET // 子线程,设置栈顶 MOVQ SI, SP // If g or m are nil, skip Go-related setup. CMPQ R8, $0 // m JEQ nog CMPQ R9, $0 // g JEQ nog // 调用系统调用 gettid 获取线程id初始化 mp.procid MOVL $SYS_gettid, AX SYSCALL MOVQ AX, m_procid(R8) // 设置线程tls LEAQ m_tls(R8), DI CALL runtime·settls(SB) // In child, set up new stack get_tls(CX) MOVQ R8, g_m(R9) // gp.m = mp MOVQ R9, g(CX) // mp.tls[0] = gp CALL runtime·stackcheck(SB)nog: // Call fn CALL R12 // 调用fn,此处是mstart,永不返回。 // It shouldn't return. If it does, exit that thread. MOVL $111, DI MOVL $SYS_exit, AX SYSCALL JMP -3(PC) // keep exiting
总结一下clone的工作:
- 准备系统调用clone的参数
- 将mp,gp,fn从父线程栈复制到寄存器中,给子线程用
- 调用clone
- 父线程返回
- 子线程设置 m.procid、tls、gp,mp互相绑定、调用fn
调用sysmon
在newosproc中调用clone,并将 mstart 的地址传入。也就是整个线程开始执行。
mstart 与 mstart1 在之前的文章有分析过,现在来看一下与本文有关的段落。
func mstart1() { _g_ := getg() save(getcallerpc(), getcallersp()) asminit() minit() // 之前初始化时的调用逻辑是 rt0_go->mstart->mstart1,当时这里的fn == nil。所以会继续向下走,进入调度循环。 // 现在调用逻辑是通过 newm(sysmon, nil)->allocm 中设置了 mp.mstartfn 为 sysmon的指针。所以下面的 fn 就不是 nil 了 // fn != nil 调用 sysmon,并且sysmon永不会返回。也就是说不会走到下面schedule中。 if fn := _g_.m.mstartfn; fn != nil { fn() } ...... schedule()}
小结
监控线程通过在runtime.main中调用newm(sysmon, nil)创建。
- newm:调用了allocm 获得了mp。
- allocm:new了一个m,也就是前面的mp。并且将 mp.mstartfn 赋值为 sysmon的指针,这很重要,后面会用。
- newm->newm1->newosproc->runtime.clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
- runtime.clone:准备系统调用clone的参数;从父线程栈复制mp,gp,fn到寄存器;调用clone;父线程返回;子线程设置sp,m.procid,tls,互相绑定mp与gp。调用mstart作为子线程的开始执行。
- mstart->mstart1:调用 _g_.m.mstartfn 指向的函数,也就是sysmon,此时监控工作正式开始。
抢占调度
主体代码
sysmon开始会检查死锁,接下来是函数主体,一个无限循环,每隔一个短时间执行一次。其工作包含网络轮询、抢占调度、垃圾回收。
sysmon中抢占调度代码
func sysmon() { lock(&sched.lock) sched.nmsys++ checkdead() unlock(&sched.lock) lasttrace := int64(0) idle := 0 // how many cycles in succession we had not wokeup somebody delay := uint32(0) // 睡眠时间,开始是20微秒;idle大于50后,翻倍增长;但最大为10毫秒 for { if idle == 0 { // start with 20us sleep... delay = 20 } else if idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // up to 10ms delay = 10 * 1000 } usleep(delay) now := nanotime() ...... // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } ...... }}
retake
- preemptone:抢占运行时间过长的G。
- handoffp:尝试为过长时间处在_Psyscall的P关联一个M继续调度。
func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // 如果运行时间太长,则抢占g t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { preemptone(_p_) // 在系统调用的情况下,preemptone() 不会工作,因为P没有与之关联的M。 sysretake = true } } // 因为此时P的状态是 _Psyscall,所以是调用过了Syscall(或者Syscall6)开头的 entersyscall 函数,而此函数会解绑P和M,所以 p.m = 0;m.p=0。 if s == _Psyscall { ...... // p的local队列为空 && (存在自旋的m || 存在空闲的p) && 距离上次系统调用不超过10ms ==> 不需要继续执行 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } ...... // p的状态更改为空闲 if atomic.Cas(&_p_.status, s, _Pidle) { ...... n++ _p_.syscalltick++ handoffp(_p_) // 尝试为p寻找一个m(startm),如果没有寻找到则 pidleput } ...... } } unlock(&allpLock) return uint32(n)}
preemptone
- 协作式抢占调度:设置抢占调度的标记,在下次进行函数调用前会检查此标记,然后调用 runtime.morestack_noctxt 最终抢占当前G
- 基于信号的异步抢占:给运行时间过长的G的M线程发送 _SIGURG。使其收到信号后执行 doSigPreempt 最终抢占当前G
func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } gp.preempt = true // 设置抢占标记 gp.stackguard0 = stackPreempt // 设置为一个大于任何真实sp的值。 // 基于信号的异步的抢占调度 if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) } return true}
协作式抢占调度
golang的编译器一般会在函数的汇编代码前后自动添加栈是否需要扩张的检查代码。
0x0000000000458360 <+0>: mov %fs:0xfffffffffffffff8,%rcx # 将当前g的指针存入rcx。tls还记得么? 0x0000000000458369 <+9>: cmp 0x10(%rcx),%rsp # 比较g.stackguard0和rsp。g结构体地址偏移16个字节就是g.stackguard0。 0x000000000045836d <+13>: jbe 0x4583b0 <main.caller+80> # 如果rsp较小,表示栈有溢出风险,调用runtime.morestack_noctxt // 此处省略具体函数汇编代码 0x00000000004583b0 <+80>: callq 0x451b30 <runtime.morestack_noctxt> 0x00000000004583b5 <+85>: jmp 0x458360 <main.caller>
假设上面的汇编代码是属于一个叫 caller 的函数的(实际上确实是的)。
当运行caller的G(暂且称其为gp)由于运行时间过长,被监控线程sysmon通过preemptone函数标记其 gp.preempt = true;gp.stackguard0 = stackPreempt。
当caller被调用时,会先进行栈的检查,因为 stackPreempt 是一个大于任何真实sp的值,所以jbe指令跳转调用 runtime.morestack_noctxt 。
goschedImpl
goschedImpl是抢占调度的关键逻辑,从 morestack_noctxt 到 goschedImpl 的调用链如下:
morestack_noctxt->morestack->newstack->gopreempt_m->goschedImpl。其中 morestack_noctxt 和 morestack 由汇编编写。
goschedImpl 的主要逻辑是:
- 更改gp的状态为_Grunable,dropg解绑G和M
- globrunqput放入全局队列
- schedule重新调度
func newstack() { thisg := getg() ...... gp := thisg.m.curg preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt ...... if preempt { ...... // Act like goroutine called runtime.Gosched. gopreempt_m(gp) // never return }}func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp)}func goschedImpl(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) // 状态从 _Grunning 改为 _Grunnable。你运行的太久了,下来吧你。 dropg() // 解绑G和M lock(&sched.lock) globrunqput(gp) // 放入全局队列 unlock(&sched.lock) schedule() // 重新调度,进入调度循环。}
基于信号的异步抢占
上述的 preemptone函数会调用preemptM函数,并且最终会调用tgkill系统调用,向需要被抢占的G所在的工作线程发送 _SIGURG 信号。
发送信号
func preemptM(mp *m) { ...... signalM(mp, sigPreempt)}// signalM sends a signal to mp.func signalM(mp *m, sig int) { tgkill(getpid(), int(mp.procid), sig)}TEXT ·tgkill(SB),NOSPLIT,$0 MOVQ tgid+0(FP), DI MOVQ tid+8(FP), SI MOVQ sig+16(FP), DX MOVL $SYS_tgkill, AX SYSCALL RET
执行抢占
内核在收到 _SIGURG 信号后,会调用该线程注册的信号处理程序,最终会执行到以下程序。
因为注册逻辑不是问题的关注核心,所以就放在后面有介绍。
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) { _g_ := getg() c := &sigctxt{info, ctxt} ...... if sig == sigPreempt { // const sigPreempt doSigPreempt(gp, c) } ......}func doSigPreempt(gp *g, ctxt *sigctxt) { // Check if this G wants to be preempted and is safe to // preempt. if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) { // Inject a call to asyncPreempt. ctxt.pushCall(funcPC(asyncPreempt)) } // Acknowledge the preemption. atomic.Xadd(&gp.m.preemptGen, 1)}func (c *sigctxt) pushCall(targetPC uintptr) { // Make it look like the signaled instruction called target. pc := uintptr(c.rip()) sp := uintptr(c.rsp()) sp -= sys.PtrSize *(*uintptr)(unsafe.Pointer(sp)) = pc c.set_rsp(uint64(sp)) c.set_rip(uint64(targetPC)) // pc指向asyncPreempt}// asyncPreempt->asyncPreempt2func asyncPreempt2() { gp := getg() gp.asyncSafePoint = true if gp.preemptStop { mcall(preemptPark) } else { mcall(gopreempt_m) } gp.asyncSafePoint = false}// gopreempt_m里调用了goschedImpl,这个函数上面分析过,是完成抢占的关键。此时也就是完成了抢占,进入调度循环。func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp)}
信号处理程序的注册与执行
注册
m0的信号处理程序是在整个程序一开始就在 mstart1 中开始注册的。
而其他M所属线程因为在clone的时候指定了 _CLONE_SIGHAND 标记,共享了信号handler table。所以一出生就有了。
注册逻辑如下:
// 省略了一些无关代码func mstart1() { if _g_.m == &m0 { mstartm0() }}func mstartm0() { initsig(false)}// 循环注册信号处理程序func initsig(preinit bool) { for i := uint32(0); i < _NSIG; i++ { ...... setsig(i, funcPC(sighandler)) }}// sigtramp注册为处理程序func setsig(i uint32, fn uintptr) { var sa sigactiont sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART sigfillset(&sa.sa_mask) if GOARCH == "386" || GOARCH == "amd64" { sa.sa_restorer = funcPC(sigreturn) } if fn == funcPC(sighandler) { if iscgo { fn = funcPC(cgoSigtramp) } else { fn = funcPC(sigtramp) } } sa.sa_handler = fn sigaction(i, &sa, nil)}// sigaction->sysSigaction->rt_sigaction// 调用rt_sigaction系统调用,注册处理程序TEXT runtime·rt_sigaction(SB),NOSPLIT,$0-36 MOVQ sig+0(FP), DI MOVQ new+8(FP), SI MOVQ old+16(FP), DX MOVQ size+24(FP), R10 MOVL $SYS_rt_sigaction, AX SYSCALL MOVL AX, ret+32(FP) RET
以上逻辑主要作用就是循环注册 _NSIG(65) 个信号处理程序,其实都是 sigtramp 函数。操作系统内核在收到信号后会调用此函数。
执行
sigtramp是入口,sighandler根据不同信号调用处理程序。
TEXT runtime·sigtramp(SB),NOSPLIT,$72 ...... MOVQ DX, ctx-56(SP) MOVQ SI, info-64(SP) MOVQ DI, signum-72(SP) MOVQ $runtime·sigtrampgo(SB), AX CALL AX ...... RETfunc sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) { ...... c := &sigctxt{info, ctx} g := sigFetchG(c) // getg() ...... sighandler(sig, info, ctx, g) ......}
文章来源:智云一二三科技
文章标题:Golang源码学习:监控线程
文章地址:https://www.zhihuclub.com/1035.shtml