5-5.调度-抢占

单核CPU,开两个 goroutine,其中一个死循环,会怎么样?

答案是:死循环的 goroutine 卡住了,但是完全不影响另一个 goroutine 和主 goroutine 的运行。

go1.14 版本实现了基于信号的抢占式调度,可以在 goroutine 执行时间过长的时候强制让出调度,执行其他的 goroutine。接下来看看具体怎么实现的。基于 go1.15 linux amd64

先看一个常规的例子

func main() {
    // 只有一个 P 了,目前运行主 goroutine
    runtime.GOMAXPROCS(1)

    // 创建一个 G1 , runtime.newproc -> runqput(_p_, newg, true) 放入到 P.runnext 字段
    go f1()

    // 因为只有一个 P 主 goroutine继续运行

    // 创建一个 G2 , 同上面,也是加入到队列头部,这时候本地队列的顺序就是 G2 G1
    go f2()

    // 等待 f1 f2的执行, gopark 主 goroutine, GMP 调度可运行的 G
    // 按顺序调用 G2 G1
    // 所以不管执行多少次,结果都是
    // This is f2
    // This is f1
    // success
    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

如果将 runtime.GOMAXPROCS(1) 改成 runtime.GOMAXPROCS(4) 即多核CPU,你就会发现 f1f2 交替执行,没有明确的先后。因为此时有4个 P,两个 M 可以分别运行自己的 P 并行执行 f1f2

既然每次都是 f2 先执行,那在 f2 中加入一个死循环会怎么样呢?

func f1() {
    fmt.Println("This is f1")
}

func f2() {
    // 死循环
    for {

    }
    fmt.Println("This is f2")
}

func main() {
    runtime.GOMAXPROCS(1)

    go f1()
    go f2()

    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

// This is f1
// success

这段代码在 go 1.14 中可以正常执行,但 go 1.13 中则会卡住。因为之前版本在执行栈容量检查时才会检查是否需要抢占,对于 f2 里这种简单的 for 循环不会触发栈容量检查,就会导致不触发抢占。

强制抢占

操作系统使用的是强制抢占,即线程执行超过一定时间后,不管是否执行完,强制换上别的线程,以实现公平

Go 1.2 协作式调度 / 请求式抢占

Go 的抢占调度实现原理就是给 G 里放了一个标记字段,标记它是否可以被抢占。执行前检查这个字段,如果为 true,就表示可以暂停它,让出当前 CPU 给别的协程

  • Go 调度在 go1.2 实现了抢占,应该更精确的称为请求式抢占,那是因为 Go 调度器的抢占和 OS 的线程抢占比起来很柔和,不暴力,不会说线程时间片到了,或者更高优先级的任务到了,强制暂停当前任务给其他的任务。

  • 抢占请求需要满足 2个条件中的1个:

    1. G进行系统调用超过20us

    2. G运行超过10ms。

  • 调度器在启动的时候会启动一个单独的线程 sysmon,它负责所有的监控工作,其中1项就是抢占,发现满足抢占条件的 G 时,就发出抢占请求。

实现机制

  • 编译阶段,编译器会在一些有明显消耗的函数头部插入一些栈增长检测代码,用来做扩栈和调度相关的判断。

  • 函数执行时,通过检测 g.stackguard0 == 0,来判断是否要扩栈。该标志也用来判断是否该做调度,如果 g.stackguard0 == stackPreempt,则执行调度。(在 Go1.14 里使用 g.preempt 字段代替)

    • 调度时还会检查 gcwating 标志,如果 gcwating == 1, (表示 GC 正在等待执行 STW 操作),便会让出当前 P

  • 因此,对于非函数调用 (例如某个 G 执行的是简单 for 循环),即使设置了抢占标志,该 G 也不会执行栈扩容检测,就会一直霸占 M ,无法完成调度

Go 1.14 信号式抢占

  • 程序启动时,会注册函数,监听 SIGURG 信号并绑定处理方法 runtime.doSigPreempt

  • 需要调度时,发送 SIGURG 信号

  • 监听函数收到 SIGURG 信号,执行调度 schedule()

调度时机

  1. Go 后台监控 runtime.sysmon 检测超时发送抢占信号

  2. Go GC 栈扫描发送抢占信号

    • GC Root 进行标记的时候会扫描 G 的栈,扫描之前会调用 suspendG 挂起 G 的执行才进行扫描

    • 会将处于运行状态的 GpreemptStop 标记成为 true,并调用 runtime.preemptM 会调用 runtime.signalM 向线程发送信号 SIGURG

    • 扫描完毕之后再次调用 resumeG 恢复执行。

  3. Go GC STW 的时候调用 preemptall 抢占所有 P,让其暂停

源码

sysmon

调度器在启动的时候会启动一个单独的后台监控线程 sysmon,它负责所有的监控工作,其中一项就是抢占,发现满足抢占条件的 G 时,就发出抢占请求。

sysmon 每隔段时间就会检查一次,间隔时间从 20us 递增,最多 10ms。成功调度后会恢复回 20us

如果 sysmon 发现一个协程执行超过 10ms 了,就会发出抢占信号 SIGURG

此外,程序在启动时会启动函数监听 SIGURG。收到这个信号后,执行 runtime.schedule() 函数触发调度。

func sysmon() {
    //无限循环
    for {
        //渐进式休眠,最短20us,最长10ms
        if idle == 0 { 
            delay = 20
        } else if idle > 50 { //前50轮休眠时间都是20us,50轮后double
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay) //休眠
        
        // retake 对所有的 G 进行重整
        if retake(now) != 0 {
            idle = 0 //retake成功则重置轮数
        } else {
            idle++ //否则轮数+1
        }
    }
}

//retake 对所有的 G 进行重整,发起抢占
func retake(now int64) uint32 {
    //加锁
    lock(&allpLock)

    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall { 
            // P 处于系统调用之中,抢占检查条件比较复杂,略
            // 与下面的区别是,syscall 本就是通过 g0 无 P 执行的,因此这个 P 此时挂在这个 M 上是浪费的,需要把这个 P 给别的 M
        } else if s == _Prunning { // P处于运行状态,检查其是否运行得太久了
            // forcePreemptNS = 10ms,如果协程独占P超过10ms,就需要进行抢占了
            if pd.schedwhen+forcePreemptNS <= now {
                preemptone(_p_)    // 修改 G 的标记为可被抢占
                sysretake = true
            }
        }
    }
}

发送调度信号

preemptone() 里修改了 Gpreempt 字段为可被抢占,然后调用 signalM 发出信号量

//抢占
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    gp := mp.curg
    
    // 标记 G 的抢占标识为 true
    // (在1.14之前的版本协作式抢占中,这个标记设置好了,就只能干等着,直到 morestask (函数调用)或者gopark(IO操作,例如channel之类)才会检查)
    gp.preempt = true

    // 1.14及之后,会直接调用 preemptM 信号通知线程,即所谓信号式抢占
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        preemptM(mp)
    }
}

//发出抢占信号
func preemptM(mp *m) {
    // CAS保证并发安全
    if atomic.Cas(&mp.signalPending, 0, 1) {
        //发出信号
        //const sigPreempt = _SIGURG,SIGURG是Linux下几乎没用的一个信号量,被go拿来当作抢占信号了
        signalM(mp, sigPreempt)
    }
}

接收调度信号

程序初始化时,会注册函数监听 SIGURG 信号

//程序初始化时,监听SIGURG信号
//src/runtime/proc.go
func mstartm0() {
    initsig(false)
}

//src/runtime/signal_unix.go
func initsig(preinit bool) {
    // 对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
    setsig(i, funcPC(sighandler))
}

//runtime/preempt.go
//这个函数里包含了对各种信号量的处理,其中就包括SIGURG
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        doSigPreempt(gp, c)
    }
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
    // 判断异步安全点
    if wantAsyncPreempt(gp) {
        if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
            // Adjust the PC and inject a call to asyncPreempt.
            ctxt.pushCall(funcPC(asyncPreempt), newpc)
        }
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
    atomic.Store(&gp.m.signalPending, 0)
}

// asyncPreempt saves all user registers and calls asyncPreempt2.
// When stack scanning encounters an asyncPreempt frame, it scans that
// frame and its parent frame conservatively.
// 这个函数是用汇编实现的,实际调用了asyncPreempt2
func asyncPreempt()

//go:nosplit
func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    if gp.preemptStop {    // GC触发的抢占。GC时,会暂停所有G的抢占,所以叫 preemptStop。只有 GC 自己能抢占
        mcall(preemptPark) // 里面调用的 runtime.schedule()
    } else {
        mcall(gopreempt_m) // 里面调用的 goschedImpl,goschedImpl 调用的runtime.schedule()
    }
    gp.asyncSafePoint = false
}

收到信号后的处理

asyncPreempt2() 里,如果是 GC 触发的抢占,就执行 preemptPark,否则执行 gopreempt_m。里面最终都会调用 schedule()

preemptPark 方法会解绑 MG 的关系,封存当前协程,执行 runtime.schedule() 来重新调度,获取可执行的协程,至于被抢占的协程后面会去重启。

// runtime/proc.go
// preemptPark parks gp and puts it in _Gpreempted.
func preemptPark(gp *g) {
    casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)     // 修改 G 的状态为抢占中
    dropg()                                                  // 解绑 M 和 G
    casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) // 修改 G 的状态为被抢占
    schedule()                                               // 调度
}

// 解绑 M 和 G
func dropg() {
    _g_ := getg()

    setMNoWB(&_g_.m.curg.m, nil) // 将 G 的 M 设置为 nil
    setGNoWB(&_g_.m.curg, nil)   // 将 M 的 curg 设置为 nil
}

goschedImpl 操作就简单的多,把当前协程的状态从 _Grunning 正在执行改成 _Grunnable 可执行,使用 globrunqput 方法把抢占的协程放到全局队列里,根据 MPG 的协程调度设计,全局队列要后于本地队列被调度。

func gopreempt_m(gp *g) {
    goschedImpl(gp)
}

func goschedImpl(gp *g) {
    casgstatus(gp, _Grunning, _Grunnable) //修改G的状态为可运行
    dropg()                               //解绑M和G
    lock(&sched.lock)                     //加锁
    globrunqput(gp)                       //把g放入全局队列
    unlock(&sched.lock)                   //解锁
    schedule()                            //调度
}

参考

深度解密Go语言之 scheduler

单核CPU下Go语言调度及抢占式调度的实现

golang 异步抢占例子

抢占式调度

峰云就她了-go1.14基于信号的抢占式调度实现原理

查漏补缺之Goroutine的调度

朴素的心态-单核CPU下Go语言调度及抢占式调度的实现

Last updated