Golang源码学习:监控线程

监控线程是在runtime.main执行的时候在系统栈中创建的,监控线程与普通的工作线程区别在于,监控线程不需要绑定p来运行。

监控线程的创建与启动

简单的调用图

先给出个简单的调用图,好心里有数,逐个分析完后做个小结。

主体代码

以下会合并小篇幅且易懂的代码段,个人认为重点的会单独摘出来。

main->newm->newm1->newosproc

func main() {
        ......
	if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
		systemstack(func() {
			newm(sysmon, nil)
		})
	}
        ......
}

func newm(fn func(), _p_ *p) {
	mp := allocm(_p_, fn)	// 分配一个m
	mp.nextp.set(_p_)
	mp.sigmask = initSigmask
	......
	newm1(mp)
}

func newm1(mp *m) {
	......
	execLock.rlock() // Prevent process clone.
	newosproc(mp)
	execLock.runlock()
}

cloneFlags = _CLONE_VM | /* share memory */
		_CLONE_FS | /* share cwd, etc */
		_CLONE_FILES | /* share fd table */
		_CLONE_SIGHAND | /* share sig handler table */
		_CLONE_SYSVSEM | /* share SysV semaphore undo lists (see issue #20763) */
		_CLONE_THREAD /* revisit - okay for now */

func newosproc(mp *m) {
	stk := unsafe.Pointer(mp.g0.stack.hi)
        ......
	sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
        // 这里注意一下,mstart会被作为工作线程的开始,在runtime.clone中会被调用。
	ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
	sigprocmask(_SIG_SETMASK, &oset, nil)
        ......
}

allocm

在此场景中其工作是new一个m,m.mstartfn = sysmon。

分配一个g与mp相互绑定,这个g就是mp的g0。但不是全局变量的那个g0,全局变量g0是m0的m.g0。

func allocm(_p_ *p, fn func()) *m {
	_g_ := getg()
	acquirem() // disable GC because it can be called from sysmon
	// 忽略sysmon不会执行的代码
	mp := new(m)	// 新建一个m
	mp.mstartfn = fn	// fn 指向 sysmon
	mcommoninit(mp)	// 之前的文章有分析过,做一些初始化工作。

	if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
		mp.g0 = malg(-1)
	} else {
		mp.g0 = malg(8192 * sys.StackGuardMultiplier)	// 分配一个g
	}
	mp.g0.m = mp
        ......
	releasem(_g_.m)
	return mp
}

runtime.clone

调用clone,内核会创建出一个子线程,返回两次。返回0是子线程,否则是父线程。

效果与fork类似,其实是fork封装了clone。

// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
        // 准备clone系统调用的参数
	MOVL	flags+0(FP), DI
	MOVQ	stk+8(FP), SI
	MOVQ	$0, DX
	MOVQ	$0, R10

	// 从父进程栈复制mp, gp, fn。子线程会用到。
	MOVQ	mp+16(FP), R8
	MOVQ	gp+24(FP), R9
	MOVQ	fn+32(FP), R12

        // 调用clone
	MOVL	$SYS_clone, AX
	SYSCALL

	// 父线程,返回.
	CMPQ	AX, $0
	JEQ	3(PC)
	MOVL	AX, ret+40(FP)
	RET

	// 子线程,设置栈顶
	MOVQ	SI, SP

	// If g or m are nil, skip Go-related setup.
	CMPQ	R8, $0    // m
	JEQ	nog
	CMPQ	R9, $0    // g
	JEQ	nog

	// 调用系统调用 gettid 获取线程id初始化 mp.procid
	MOVL	$SYS_gettid, AX
	SYSCALL
	MOVQ	AX, m_procid(R8)

	// 设置线程tls
	LEAQ	m_tls(R8), DI
	CALL	runtime·settls(SB)

	// In child, set up new stack
	get_tls(CX)
	MOVQ	R8, g_m(R9)	// gp.m = mp
	MOVQ	R9, g(CX)	// mp.tls[0] = gp
	CALL	runtime·stackcheck(SB)

nog:
	// Call fn
	CALL	R12		// 调用fn,此处是mstart,永不返回。

	// It shouldn't return. If it does, exit that thread.
	MOVL	$111, DI
	MOVL	$SYS_exit, AX
	SYSCALL
	JMP	-3(PC)	// keep exiting

总结一下clone的工作:

  • 准备系统调用clone的参数
  • 将mp,gp,fn从父线程栈复制到寄存器中,给子线程用
  • 调用clone
  • 父线程返回
  • 子线程设置 m.procid、tls、gp,mp互相绑定、调用fn

调用sysmon

在newosproc中调用clone,并将 mstart 的地址传入。也就是整个线程开始执行。

mstart 与 mstart1 在之前的文章有分析过,现在来看一下与本文有关的段落。

func mstart1() {
	_g_ := getg()
	save(getcallerpc(), getcallersp())
	asminit()
	minit()
        // 之前初始化时的调用逻辑是 rt0_go->mstart->mstart1,当时这里的fn == nil。所以会继续向下走,进入调度循环。
        // 现在调用逻辑是通过 newm(sysmon, nil)->allocm 中设置了 mp.mstartfn 为 sysmon的指针。所以下面的 fn 就不是 nil 了
        // fn != nil 调用 sysmon,并且sysmon永不会返回。也就是说不会走到下面schedule中。
	if fn := _g_.m.mstartfn; fn != nil {
		fn()
	}
        ......
	schedule()
}

小结

监控线程通过在runtime.main中调用newm(sysmon, nil)创建。

  • newm:调用了allocm 获得了mp。
  • allocm:new了一个m,也就是前面的mp。并且将 mp.mstartfn 赋值为 sysmon的指针,这很重要,后面会用。
  • newm->newm1->newosproc->runtime.clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
  • runtime.clone:准备系统调用clone的参数;从父线程栈复制mp,gp,fn到寄存器;调用clone;父线程返回;子线程设置sp,m.procid,tls,互相绑定mp与gp。调用mstart作为子线程的开始执行。
  • mstart->mstart1:调用 _g_.m.mstartfn 指向的函数,也就是sysmon,此时监控工作正式开始。

抢占调度

主体代码

sysmon开始会检查死锁,接下来是函数主体,一个无限循环,每隔一个短时间执行一次。其工作包含网络轮询、抢占调度、垃圾回收。

sysmon中抢占调度代码

func sysmon() {
	lock(&sched.lock)
	sched.nmsys++
	checkdead()
	unlock(&sched.lock)

	lasttrace := int64(0)
	idle := 0 // how many cycles in succession we had not wokeup somebody
	delay := uint32(0)	// 睡眠时间,开始是20微秒;idle大于50后,翻倍增长;但最大为10毫秒
	for {
		if idle == 0 { // start with 20us sleep...
			delay = 20
		} else if idle > 50 { // start doubling the sleep after 1ms...
			delay *= 2
		}
		if delay > 10*1000 { // up to 10ms
			delay = 10 * 1000
		}
		usleep(delay)
		now := nanotime()
		......
		// retake P's blocked in syscalls
		// and preempt long running G's
		if retake(now) != 0 {
			idle = 0
		} else {
			idle++
		}
                ......
	}
}

retake

  • preemptone:抢占运行时间过长的G。
  • handoffp:尝试为过长时间处在_Psyscall的P关联一个M继续调度。
func retake(now int64) uint32 {
	n := 0
	lock(&allpLock)
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil {
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false             
		if s == _Prunning || s == _Psyscall {
                        // 如果运行时间太长,则抢占g
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) != t {
				pd.schedtick = uint32(t)
				pd.schedwhen = now
			} else if pd.schedwhen+forcePreemptNS <= now {
				preemptone(_p_)	// 在系统调用的情况下,preemptone() 不会工作,因为P没有与之关联的M。
				sysretake = true
			}
		}
                // 因为此时P的状态是 _Psyscall,所以是调用过了Syscall(或者Syscall6)开头的 entersyscall 函数,而此函数会解绑P和M,所以 p.m = 0;m.p=0。
		if s == _Psyscall {
			......
                        // p的local队列为空 && (存在自旋的m || 存在空闲的p) && 距离上次系统调用不超过10ms ==> 不需要继续执行
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			}
			......
                        // p的状态更改为空闲
			if atomic.Cas(&_p_.status, s, _Pidle) {
				......
				n++
				_p_.syscalltick++
				handoffp(_p_) // 尝试为p寻找一个m(startm),如果没有寻找到则 pidleput
			}
			......
		}
	}
	unlock(&allpLock)
	return uint32(n)
}

preemptone

  • 协作式抢占调度:设置抢占调度的标记,在下次进行函数调用前会检查此标记,然后调用 runtime.morestack_noctxt 最终抢占当前G
  • 基于信号的异步抢占:给运行时间过长的G的M线程发送 _SIGURG。使其收到信号后执行 doSigPreempt 最终抢占当前G
func preemptone(_p_ *p) bool {
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}
	gp.preempt = true	// 设置抢占标记
	gp.stackguard0 = stackPreempt	// 设置为一个大于任何真实sp的值。

	// 基于信号的异步的抢占调度
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	}
	return true
}

协作式抢占调度

golang的编译器一般会在函数的汇编代码前后自动添加栈是否需要扩张的检查代码。

   0x0000000000458360 <+0>:     mov    %fs:0xfffffffffffffff8,%rcx  # 将当前g的指针存入rcx。tls还记得么?
   0x0000000000458369 <+9>:     cmp    0x10(%rcx),%rsp              # 比较g.stackguard0和rsp。g结构体地址偏移16个字节就是g.stackguard0。
   0x000000000045836d <+13>:    jbe    0x4583b0 <main.caller+80>    # 如果rsp较小,表示栈有溢出风险,调用runtime.morestack_noctxt
   // 此处省略具体函数汇编代码
   0x00000000004583b0 <+80>:	callq  0x451b30 <runtime.morestack_noctxt>
   0x00000000004583b5 <+85>:	jmp    0x458360 <main.caller>

假设上面的汇编代码是属于一个叫 caller 的函数的(实际上确实是的)。

当运行caller的G(暂且称其为gp)由于运行时间过长,被监控线程sysmon通过preemptone函数标记其 gp.preempt = true;gp.stackguard0 = stackPreempt。

当caller被调用时,会先进行栈的检查,因为 stackPreempt 是一个大于任何真实sp的值,所以jbe指令跳转调用 runtime.morestack_noctxt 。

goschedImpl

goschedImpl是抢占调度的关键逻辑,从 morestack_noctxt 到 goschedImpl 的调用链如下:

morestack_noctxt->morestack->newstack->gopreempt_m->goschedImpl。其中 morestack_noctxt 和 morestack 由汇编编写。

goschedImpl 的主要逻辑是:

  • 更改gp的状态为_Grunable,dropg解绑G和M
  • globrunqput放入全局队列
  • schedule重新调度
func newstack() {
	thisg := getg()
	......
	gp := thisg.m.curg
        preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
        ......
        if preempt {
		......
		// Act like goroutine called runtime.Gosched.
		gopreempt_m(gp) // never return
	}
}

func gopreempt_m(gp *g) {
	if trace.enabled {
		traceGoPreempt()
	}
	goschedImpl(gp)
}

func goschedImpl(gp *g) {
	status := readgstatus(gp)
	if status&^_Gscan != _Grunning {
		dumpgstatus(gp)
		throw("bad g status")
	}
	casgstatus(gp, _Grunning, _Grunnable)	// 状态从 _Grunning 改为 _Grunnable。你运行的太久了,下来吧你。
	dropg()		// 解绑G和M
	lock(&sched.lock)
	globrunqput(gp)	// 放入全局队列
	unlock(&sched.lock)
	schedule()	// 重新调度,进入调度循环。
}

基于信号的异步抢占

上述的 preemptone函数会调用preemptM函数,并且最终会调用tgkill系统调用,向需要被抢占的G所在的工作线程发送 _SIGURG 信号。

发送信号

func preemptM(mp *m) {
	......
	signalM(mp, sigPreempt)
}
// signalM sends a signal to mp.
func signalM(mp *m, sig int) {
	tgkill(getpid(), int(mp.procid), sig)
}

TEXT ·tgkill(SB),NOSPLIT,$0
	MOVQ	tgid+0(FP), DI
	MOVQ	tid+8(FP), SI
	MOVQ	sig+16(FP), DX
	MOVL	$SYS_tgkill, AX
	SYSCALL
	RET

执行抢占

内核在收到 _SIGURG 信号后,会调用该线程注册的信号处理程序,最终会执行到以下程序。

因为注册逻辑不是问题的关注核心,所以就放在后面有介绍。

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
	_g_ := getg()
	c := &sigctxt{info, ctxt}
        ......
	if sig == sigPreempt {    // const sigPreempt
		doSigPreempt(gp, c)
	}
        ......
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
	// Check if this G wants to be preempted and is safe to
	// preempt.
	if wantAsyncPreempt(gp) && isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()) {
		// Inject a call to asyncPreempt.
		ctxt.pushCall(funcPC(asyncPreempt))
	}

	// Acknowledge the preemption.
	atomic.Xadd(&gp.m.preemptGen, 1)
}

func (c *sigctxt) pushCall(targetPC uintptr) {
	// Make it look like the signaled instruction called target.
	pc := uintptr(c.rip())
	sp := uintptr(c.rsp())
	sp -= sys.PtrSize
	*(*uintptr)(unsafe.Pointer(sp)) = pc
	c.set_rsp(uint64(sp))
	c.set_rip(uint64(targetPC)) // pc指向asyncPreempt
}

// asyncPreempt->asyncPreempt2
func asyncPreempt2() {
	gp := getg()
	gp.asyncSafePoint = true
	if gp.preemptStop {
		mcall(preemptPark)
	} else {
		mcall(gopreempt_m)
	}
	gp.asyncSafePoint = false
}

// gopreempt_m里调用了goschedImpl,这个函数上面分析过,是完成抢占的关键。此时也就是完成了抢占,进入调度循环。
func gopreempt_m(gp *g) {
	if trace.enabled {
		traceGoPreempt()
	}
	goschedImpl(gp)
}


信号处理程序的注册与执行

注册

m0的信号处理程序是在整个程序一开始就在 mstart1 中开始注册的。

而其他M所属线程因为在clone的时候指定了 _CLONE_SIGHAND 标记,共享了信号handler table。所以一出生就有了。

注册逻辑如下:

// 省略了一些无关代码
func mstart1() {
	if _g_.m == &m0 {
		mstartm0()
	}
}

func mstartm0() {
	initsig(false)
}

// 循环注册信号处理程序
func initsig(preinit bool) {
	for i := uint32(0); i < _NSIG; i++ {
                ......
		setsig(i, funcPC(sighandler))
	}
}

// sigtramp注册为处理程序
func setsig(i uint32, fn uintptr) {
	var sa sigactiont
	sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART
	sigfillset(&sa.sa_mask)
	if GOARCH == "386" || GOARCH == "amd64" {
		sa.sa_restorer = funcPC(sigreturn)
	}
	if fn == funcPC(sighandler) {
		if iscgo {
			fn = funcPC(cgoSigtramp)
		} else {
			fn = funcPC(sigtramp)
		}
	}
	sa.sa_handler = fn
	sigaction(i, &sa, nil)
}

// sigaction->sysSigaction->rt_sigaction
// 调用rt_sigaction系统调用,注册处理程序
TEXT runtime·rt_sigaction(SB),NOSPLIT,$0-36
	MOVQ	sig+0(FP), DI
	MOVQ	new+8(FP), SI
	MOVQ	old+16(FP), DX
	MOVQ	size+24(FP), R10
	MOVL	$SYS_rt_sigaction, AX
	SYSCALL
	MOVL	AX, ret+32(FP)
	RET

以上逻辑主要作用就是循环注册 _NSIG(65) 个信号处理程序,其实都是 sigtramp 函数。操作系统内核在收到信号后会调用此函数。

执行

sigtramp是入口,sighandler根据不同信号调用处理程序。

TEXT runtime·sigtramp(SB),NOSPLIT,$72
        ......
	MOVQ	DX, ctx-56(SP)
	MOVQ	SI, info-64(SP)
	MOVQ	DI, signum-72(SP)
	MOVQ	$runtime·sigtrampgo(SB), AX
	CALL AX
        ......
	RET

func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
        ......
        c := &sigctxt{info, ctx}
	g := sigFetchG(c) // getg()
        ......
	sighandler(sig, info, ctx, g)
        ......
}


发表评论

电子邮件地址不会被公开。 必填项已用*标注