5-5.调度-抢占
单核CPU,开两个 goroutine,其中一个死循环,会怎么样?
答案是:死循环的 goroutine 卡住了,但是完全不影响另一个 goroutine 和主 goroutine 的运行。
go1.14 版本实现了基于信号的抢占式调度,可以在 goroutine 执行时间过长的时候强制让出调度,执行其他的 goroutine。接下来看看具体怎么实现的。基于 go1.15 linux amd64。
先看一个常规的例子
func main() {
// 只有一个 P 了,目前运行主 goroutine
runtime.GOMAXPROCS(1)
// 创建一个 G1 , runtime.newproc -> runqput(_p_, newg, true) 放入到 P.runnext 字段
go f1()
// 因为只有一个 P 主 goroutine继续运行
// 创建一个 G2 , 同上面,也是加入到队列头部,这时候本地队列的顺序就是 G2 G1
go f2()
// 等待 f1 f2的执行, gopark 主 goroutine, GMP 调度可运行的 G
// 按顺序调用 G2 G1
// 所以不管执行多少次,结果都是
// This is f2
// This is f1
// success
time.Sleep(100 * time.Millisecond)
fmt.Println("success")
}如果将 runtime.GOMAXPROCS(1) 改成 runtime.GOMAXPROCS(4) 即多核CPU,你就会发现 f1 和 f2 交替执行,没有明确的先后。因为此时有4个 P,两个 M 可以分别运行自己的 P 并行执行 f1 和 f2。
既然每次都是 f2 先执行,那在 f2 中加入一个死循环会怎么样呢?
func f1() {
fmt.Println("This is f1")
}
func f2() {
// 死循环
for {
}
fmt.Println("This is f2")
}
func main() {
runtime.GOMAXPROCS(1)
go f1()
go f2()
time.Sleep(100 * time.Millisecond)
fmt.Println("success")
}
// This is f1
// success这段代码在 go 1.14 中可以正常执行,但 go 1.13 中则会卡住。因为之前版本在执行栈容量检查时才会检查是否需要抢占,对于 f2 里这种简单的 for 循环不会触发栈容量检查,就会导致不触发抢占。
强制抢占
操作系统使用的是强制抢占,即线程执行超过一定时间后,不管是否执行完,强制换上别的线程,以实现公平
Go 1.2 协作式调度 / 请求式抢占
Go 的抢占调度实现原理就是给 G 里放了一个标记字段,标记它是否可以被抢占。执行前检查这个字段,如果为 true,就表示可以暂停它,让出当前 CPU 给别的协程
Go 调度在 go1.2 实现了抢占,应该更精确的称为请求式抢占,那是因为 Go 调度器的抢占和 OS 的线程抢占比起来很柔和,不暴力,不会说线程时间片到了,或者更高优先级的任务到了,强制暂停当前任务给其他的任务。
抢占请求需要满足 2个条件中的1个:
G进行系统调用超过20us
G运行超过10ms。
调度器在启动的时候会启动一个单独的线程
sysmon,它负责所有的监控工作,其中1项就是抢占,发现满足抢占条件的G时,就发出抢占请求。
实现机制
编译阶段,编译器会在一些有明显消耗的函数头部插入一些栈增长检测代码,用来做扩栈和调度相关的判断。
函数执行时,通过检测
g.stackguard0 == 0,来判断是否要扩栈。该标志也用来判断是否该做调度,如果g.stackguard0 == stackPreempt,则执行调度。(在 Go1.14 里使用g.preempt字段代替)调度时还会检查
gcwating标志,如果gcwating == 1, (表示GC正在等待执行STW操作),便会让出当前P
因此,对于非函数调用 (例如某个
G执行的是简单for循环),即使设置了抢占标志,该G也不会执行栈扩容检测,就会一直霸占M,无法完成调度
Go 1.14 信号式抢占
程序启动时,会注册函数,监听
SIGURG信号并绑定处理方法runtime.doSigPreempt需要调度时,发送
SIGURG信号监听函数收到
SIGURG信号,执行调度schedule()
调度时机
Go后台监控runtime.sysmon检测超时发送抢占信号Go GC栈扫描发送抢占信号对
GC Root进行标记的时候会扫描G的栈,扫描之前会调用suspendG挂起G的执行才进行扫描会将处于运行状态的
G的preemptStop标记成为true,并调用runtime.preemptM会调用runtime.signalM向线程发送信号SIGURG扫描完毕之后再次调用
resumeG恢复执行。
Go GC STW的时候调用preemptall抢占所有P,让其暂停
源码
sysmon
调度器在启动的时候会启动一个单独的后台监控线程 sysmon,它负责所有的监控工作,其中一项就是抢占,发现满足抢占条件的 G 时,就发出抢占请求。
sysmon 每隔段时间就会检查一次,间隔时间从 20us 递增,最多 10ms。成功调度后会恢复回 20us。
如果 sysmon 发现一个协程执行超过 10ms 了,就会发出抢占信号 SIGURG。
此外,程序在启动时会启动函数监听 SIGURG。收到这个信号后,执行 runtime.schedule() 函数触发调度。
func sysmon() {
//无限循环
for {
//渐进式休眠,最短20us,最长10ms
if idle == 0 {
delay = 20
} else if idle > 50 { //前50轮休眠时间都是20us,50轮后double
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay) //休眠
// retake 对所有的 G 进行重整
if retake(now) != 0 {
idle = 0 //retake成功则重置轮数
} else {
idle++ //否则轮数+1
}
}
}
//retake 对所有的 G 进行重整,发起抢占
func retake(now int64) uint32 {
//加锁
lock(&allpLock)
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// P 处于系统调用之中,抢占检查条件比较复杂,略
// 与下面的区别是,syscall 本就是通过 g0 无 P 执行的,因此这个 P 此时挂在这个 M 上是浪费的,需要把这个 P 给别的 M
} else if s == _Prunning { // P处于运行状态,检查其是否运行得太久了
// forcePreemptNS = 10ms,如果协程独占P超过10ms,就需要进行抢占了
if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_) // 修改 G 的标记为可被抢占
sysretake = true
}
}
}
}发送调度信号
preemptone() 里修改了 G 的 preempt 字段为可被抢占,然后调用 signalM 发出信号量
//抢占
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
gp := mp.curg
// 标记 G 的抢占标识为 true
// (在1.14之前的版本协作式抢占中,这个标记设置好了,就只能干等着,直到 morestask (函数调用)或者gopark(IO操作,例如channel之类)才会检查)
gp.preempt = true
// 1.14及之后,会直接调用 preemptM 信号通知线程,即所谓信号式抢占
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
}
//发出抢占信号
func preemptM(mp *m) {
// CAS保证并发安全
if atomic.Cas(&mp.signalPending, 0, 1) {
//发出信号
//const sigPreempt = _SIGURG,SIGURG是Linux下几乎没用的一个信号量,被go拿来当作抢占信号了
signalM(mp, sigPreempt)
}
}
接收调度信号
程序初始化时,会注册函数监听 SIGURG 信号
//程序初始化时,监听SIGURG信号
//src/runtime/proc.go
func mstartm0() {
initsig(false)
}
//src/runtime/signal_unix.go
func initsig(preinit bool) {
// 对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
setsig(i, funcPC(sighandler))
}
//runtime/preempt.go
//这个函数里包含了对各种信号量的处理,其中就包括SIGURG
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
doSigPreempt(gp, c)
}
}
func doSigPreempt(gp *g, ctxt *sigctxt) {
// 判断异步安全点
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(funcPC(asyncPreempt), newpc)
}
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
}
// asyncPreempt saves all user registers and calls asyncPreempt2.
// When stack scanning encounters an asyncPreempt frame, it scans that
// frame and its parent frame conservatively.
// 这个函数是用汇编实现的,实际调用了asyncPreempt2
func asyncPreempt()
//go:nosplit
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop { // GC触发的抢占。GC时,会暂停所有G的抢占,所以叫 preemptStop。只有 GC 自己能抢占
mcall(preemptPark) // 里面调用的 runtime.schedule()
} else {
mcall(gopreempt_m) // 里面调用的 goschedImpl,goschedImpl 调用的runtime.schedule()
}
gp.asyncSafePoint = false
}收到信号后的处理
在 asyncPreempt2() 里,如果是 GC 触发的抢占,就执行 preemptPark,否则执行 gopreempt_m。里面最终都会调用 schedule()
preemptPark 方法会解绑 M 和 G 的关系,封存当前协程,执行 runtime.schedule() 来重新调度,获取可执行的协程,至于被抢占的协程后面会去重启。
// runtime/proc.go
// preemptPark parks gp and puts it in _Gpreempted.
func preemptPark(gp *g) {
casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted) // 修改 G 的状态为抢占中
dropg() // 解绑 M 和 G
casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) // 修改 G 的状态为被抢占
schedule() // 调度
}
// 解绑 M 和 G
func dropg() {
_g_ := getg()
setMNoWB(&_g_.m.curg.m, nil) // 将 G 的 M 设置为 nil
setGNoWB(&_g_.m.curg, nil) // 将 M 的 curg 设置为 nil
}goschedImpl 操作就简单的多,把当前协程的状态从 _Grunning 正在执行改成 _Grunnable 可执行,使用 globrunqput 方法把抢占的协程放到全局队列里,根据 MPG 的协程调度设计,全局队列要后于本地队列被调度。
func gopreempt_m(gp *g) {
goschedImpl(gp)
}
func goschedImpl(gp *g) {
casgstatus(gp, _Grunning, _Grunnable) //修改G的状态为可运行
dropg() //解绑M和G
lock(&sched.lock) //加锁
globrunqput(gp) //把g放入全局队列
unlock(&sched.lock) //解锁
schedule() //调度
}参考
Last updated