单核CPU,开两个 goroutine
,其中一个死循环,会怎么样?
答案是:死循环的 goroutine
卡住了,但是完全不影响另一个 goroutine
和主 goroutine
的运行。
go1.14 版本实现了基于信号的抢占式调度,可以在 goroutine
执行时间过长的时候强制让出调度,执行其他的 goroutine
。接下来看看具体怎么实现的。基于 go1.15 linux amd64
。
先看一个常规的例子
Copy func main () {
// 只有一个 P 了,目前运行主 goroutine
runtime. GOMAXPROCS ( 1 )
// 创建一个 G1 , runtime.newproc -> runqput(_p_, newg, true) 放入到 P.runnext 字段
go f1 ()
// 因为只有一个 P 主 goroutine继续运行
// 创建一个 G2 , 同上面,也是加入到队列头部,这时候本地队列的顺序就是 G2 G1
go f2 ()
// 等待 f1 f2的执行, gopark 主 goroutine, GMP 调度可运行的 G
// 按顺序调用 G2 G1
// 所以不管执行多少次,结果都是
// This is f2
// This is f1
// success
time. Sleep ( 100 * time.Millisecond)
fmt. Println ( "success" )
}
如果将 runtime.GOMAXPROCS(1)
改成 runtime.GOMAXPROCS(4)
即多核CPU,你就会发现 f1
和 f2
交替执行,没有明确的先后。因为此时有4个 P
,两个 M
可以分别运行自己的 P
并行执行 f1
和 f2
。
既然每次都是 f2
先执行,那在 f2
中加入一个死循环会怎么样呢?
Copy func f1 () {
fmt. Println ( "This is f1" )
}
func f2 () {
// 死循环
for {
}
fmt. Println ( "This is f2" )
}
func main () {
runtime. GOMAXPROCS ( 1 )
go f1 ()
go f2 ()
time. Sleep ( 100 * time.Millisecond)
fmt. Println ( "success" )
}
// This is f1
// success
这段代码在 go 1.14
中可以正常执行,但 go 1.13
中则会卡住。因为之前版本在执行栈容量检查时才会检查是否需要抢占,对于 f2
里这种简单的 for
循环不会触发栈容量检查,就会导致不触发抢占。
强制抢占
操作系统使用的是强制抢占,即线程执行超过一定时间后,不管是否执行完,强制换上别的线程,以实现公平
Go 1.2 协作式调度 / 请求式抢占
Go 的抢占调度实现原理就是给 G
里放了一个标记字段,标记它是否可以被抢占。执行前检查这个字段,如果为 true
,就表示可以暂停它,让出当前 CPU
给别的协程
Go 调度在 go1.2 实现了抢占,应该更精确的称为请求式抢占,那是因为 Go 调度器的抢占和 OS 的线程抢占比起来很柔和,不暴力,不会说线程时间片到了,或者更高优先级的任务到了,强制暂停当前任务给其他的任务。
调度器在启动的时候会启动一个单独的线程 sysmon
,它负责所有的监控工作,其中1项就是抢占,发现满足抢占条件的 G
时,就发出抢占请求。
实现机制
编译阶段,编译器会在一些有明显消耗的函数头部插入一些栈增长检测代码,用来做扩栈和调度相关的判断。
函数执行时,通过检测 g.stackguard0 == 0
,来判断是否要扩栈。该标志也用来判断是否该做调度,如果 g.stackguard0 == stackPreempt
,则执行调度。(在 Go1.14 里使用 g.preempt
字段代替)
调度时还会检查 gcwating
标志,如果 gcwating == 1
, (表示 GC
正在等待执行 STW
操作),便会让出当前 P
因此,对于非函数调用 (例如某个 G
执行的是简单 for
循环),即使设置了抢占标志,该 G
也不会执行栈扩容检测,就会一直霸占 M
,无法完成调度
Go 1.14 信号式抢占
程序启动时,会注册函数,监听 SIGURG
信号并绑定处理方法 runtime.doSigPreempt
监听函数收到 SIGURG
信号,执行调度 schedule()
调度时机
Go
后台监控 runtime.sysmon
检测超时发送抢占信号
Go GC
栈扫描发送抢占信号
对 GC Root
进行标记的时候会扫描 G
的栈,扫描之前会调用 suspendG
挂起 G
的执行才进行扫描
会将处于运行状态的 G
的 preemptStop
标记成为 true
,并调用 runtime.preemptM
会调用 runtime.signalM
向线程发送信号 SIGURG
Go GC STW
的时候调用 preemptall
抢占所有 P
,让其暂停
源码
sysmon
调度器在启动的时候会启动一个单独的后台监控线程 sysmon
,它负责所有的监控工作,其中一项就是抢占,发现满足抢占条件的 G
时,就发出抢占请求。
sysmon
每隔段时间就会检查一次,间隔时间从 20us
递增,最多 10ms
。成功调度后会恢复回 20us
。
如果 sysmon
发现一个协程执行超过 10ms
了,就会发出抢占信号 SIGURG
。
此外,程序在启动时会启动函数监听 SIGURG
。收到这个信号后,执行 runtime.schedule()
函数触发调度。
Copy func sysmon () {
//无限循环
for {
//渐进式休眠,最短20us,最长10ms
if idle == 0 {
delay = 20
} else if idle > 50 { //前50轮休眠时间都是20us,50轮后double
delay *= 2
}
if delay > 10 * 1000 { // up to 10ms
delay = 10 * 1000
}
usleep (delay) //休眠
// retake 对所有的 G 进行重整
if retake (now) != 0 {
idle = 0 //retake成功则重置轮数
} else {
idle ++ //否则轮数+1
}
}
}
//retake 对所有的 G 进行重整,发起抢占
func retake (now int64 ) uint32 {
//加锁
lock ( & allpLock)
for i := 0 ; i < len (allp); i ++ {
_p_ := allp[i]
pd := & _p_.sysmontick
s := _p_.status
if s == _Psyscall {
// P 处于系统调用之中,抢占检查条件比较复杂,略
// 与下面的区别是,syscall 本就是通过 g0 无 P 执行的,因此这个 P 此时挂在这个 M 上是浪费的,需要把这个 P 给别的 M
} else if s == _Prunning { // P处于运行状态,检查其是否运行得太久了
// forcePreemptNS = 10ms,如果协程独占P超过10ms,就需要进行抢占了
if pd.schedwhen + forcePreemptNS <= now {
preemptone (_p_) // 修改 G 的标记为可被抢占
sysretake = true
}
}
}
}
发送调度信号
preemptone()
里修改了 G
的 preempt
字段为可被抢占,然后调用 signalM
发出信号量
Copy //抢占
func preemptone (_p_ * p ) bool {
mp := _p_.m. ptr ()
gp := mp.curg
// 标记 G 的抢占标识为 true
// (在1.14之前的版本协作式抢占中,这个标记设置好了,就只能干等着,直到 morestask (函数调用)或者gopark(IO操作,例如channel之类)才会检查)
gp.preempt = true
// 1.14及之后,会直接调用 preemptM 信号通知线程,即所谓信号式抢占
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM (mp)
}
}
//发出抢占信号
func preemptM (mp * m ) {
// CAS保证并发安全
if atomic. Cas ( & mp.signalPending, 0 , 1 ) {
//发出信号
//const sigPreempt = _SIGURG,SIGURG是Linux下几乎没用的一个信号量,被go拿来当作抢占信号了
signalM (mp, sigPreempt)
}
}
接收调度信号
程序初始化时,会注册函数监听 SIGURG
信号
Copy //程序初始化时,监听SIGURG信号
//src/runtime/proc.go
func mstartm0 () {
initsig ( false )
}
//src/runtime/signal_unix.go
func initsig (preinit bool ) {
// 对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
setsig (i, funcPC (sighandler))
}
//runtime/preempt.go
//这个函数里包含了对各种信号量的处理,其中就包括SIGURG
func sighandler (sig uint32 , info * siginfo , ctxt unsafe . Pointer , gp * g ) {
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
doSigPreempt (gp, c)
}
}
func doSigPreempt (gp * g , ctxt * sigctxt ) {
// 判断异步安全点
if wantAsyncPreempt (gp) {
if ok, newpc := isAsyncSafePoint (gp, ctxt. sigpc (), ctxt. sigsp (), ctxt. siglr ()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt. pushCall ( funcPC (asyncPreempt), newpc)
}
}
// Acknowledge the preemption.
atomic. Xadd ( & gp.m.preemptGen, 1 )
atomic. Store ( & gp.m.signalPending, 0 )
}
// asyncPreempt saves all user registers and calls asyncPreempt2.
// When stack scanning encounters an asyncPreempt frame, it scans that
// frame and its parent frame conservatively.
// 这个函数是用汇编实现的,实际调用了asyncPreempt2
func asyncPreempt ()
//go:nosplit
func asyncPreempt2 () {
gp := getg ()
gp.asyncSafePoint = true
if gp.preemptStop { // GC触发的抢占。GC时,会暂停所有G的抢占,所以叫 preemptStop。只有 GC 自己能抢占
mcall (preemptPark) // 里面调用的 runtime.schedule()
} else {
mcall (gopreempt_m) // 里面调用的 goschedImpl,goschedImpl 调用的runtime.schedule()
}
gp.asyncSafePoint = false
}
收到信号后的处理
在 asyncPreempt2()
里,如果是 GC 触发的抢占,就执行 preemptPark
,否则执行 gopreempt_m
。里面最终都会调用 schedule()
preemptPark
方法会解绑 M
和 G
的关系,封存当前协程,执行 runtime.schedule()
来重新调度,获取可执行的协程,至于被抢占的协程后面会去重启。
Copy // runtime/proc.go
// preemptPark parks gp and puts it in _Gpreempted.
func preemptPark (gp * g ) {
casGToPreemptScan (gp, _Grunning, _Gscan | _Gpreempted) // 修改 G 的状态为抢占中
dropg () // 解绑 M 和 G
casfrom_Gscanstatus (gp, _Gscan | _Gpreempted, _Gpreempted) // 修改 G 的状态为被抢占
schedule () // 调度
}
// 解绑 M 和 G
func dropg () {
_g_ := getg ()
setMNoWB ( & _g_.m.curg.m, nil ) // 将 G 的 M 设置为 nil
setGNoWB ( & _g_.m.curg, nil ) // 将 M 的 curg 设置为 nil
}
goschedImpl
操作就简单的多,把当前协程的状态从 _Grunning
正在执行改成 _Grunnable
可执行,使用 globrunqput
方法把抢占的协程放到全局队列里,根据 MPG
的协程调度设计,全局队列要后于本地队列被调度。
Copy func gopreempt_m (gp * g ) {
goschedImpl (gp)
}
func goschedImpl (gp * g ) {
casgstatus (gp, _Grunning, _Grunnable) //修改G的状态为可运行
dropg () //解绑M和G
lock ( & sched.lock) //加锁
globrunqput (gp) //把g放入全局队列
unlock ( & sched.lock) //解锁
schedule () //调度
}
参考
深度解密Go语言之 scheduler
单核CPU下Go语言调度及抢占式调度的实现
golang 异步抢占例子
抢占式调度
峰云就她了-go1.14基于信号的抢占式调度实现原理
查漏补缺之Goroutine的调度
朴素的心态-单核CPU下Go语言调度及抢占式调度的实现