5-4.调度-调度策略

调度需要解决的两个核心问题:

  1. 调度时机:什么时候会发生调度?

  2. 调度策略:使用什么策略来挑选下一个进入运行的 goroutine

1. 调度时机

  • 新建 M 后,启动 Mmstart() 函数里触发 schedule()

  • 新建 G 并执行后,协程执行完毕退出时的 goexit0() 函数里触发 schedule()

  • goparkchannel 这种结构在等待读写时,会使用 gopark() 休眠协程后会触发 schedule()

  • 阻塞性系统调用,比如文件 IO,网络IO

    • Golang 重写了所有系统调用,封装了 syscall ,在其前后增加了 entersyscallexitsyscall,进入系统调用前暂停当前协程、完成系统调用后在 exitsyscall() 里恢复其资源、进度,并触发 schedule()

  • GC 触发抢占:GC 需要 STW 暂停所有 G ,GC 时会发送抢占信号。注册函数收到信号后会执行 preemptPark() -> schecule()

  • 其他抢占:注册函数收到抢占信号后执行 gopreempt_m() -> goschedImpl() -> schedule()

  • 主动调用 runtime.Gosched() -> goschedImpl(),这个很少见,一般调试才用

gopark

该函数会调用 mcall 保存现场到 G.sched 字段,然后修改 G 状态为 _Gwaiting,并解绑 GM,使之暂停运行。然后调用 schedule() 调度别的 G 起来运行

// runtime/proc.go
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    mcall(park_m)
}

func park_m(gp *g) {
    _g_ := getg()

    casgstatus(gp, _Grunning, _Gwaiting) 
    dropg()     // 解绑 M 和 G

    schedule()  // 调度别的 G 起来运行
}

// runtime/asm_amd64.s
// 该函数会保存当前现场到 g.sched、然后切换到 g0 栈调用函数
TEXT runtime·mcall(SB), NOSPLIT, $0-4

goready

当通过 gopark() 休眠的协程满足条件、可以唤醒后(比如从 channel 中收到了数据),运行时会调用 goready() 唤醒协程

goready 会直接将 G 放回 P.runnext,使之以最高优先级恢复运行

// runtime/proc.go
func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

func ready(gp *g, traceskip int, next bool) {
    _g_ := getg()

    casgstatus(gp, _Gwaiting, _Grunnable) // 修改 G 的状态
    runqput(_g_.m.p.ptr(), gp, next)      // 将 G 放回 P 的 runnext 字段
    
    // 有空闲的 P 而且没有正在偷窃 的 M(自旋的M),则需要唤醒或新建一个 M 出来运行这个空闲 P
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { 
        wakep()
    }
}

func wakep() {
    startm(nil, true) // 找一个 M 执行 P 的协程
}

goschedImpl

注册函数处理抢占信号 或 我们使用 runtime.Gosched() 主动暂停当前协程时都会调用 goschedImpl 函数。该函数逻辑和 gopark 差不多,只是比它多了一步 globrunqput,会将当前协程放入全局队列

func goschedImpl(gp *g) {
    ...
    casgstatus(gp, _Grunning, _Grunnable)
    dropg()             // 设置当前m.curg = nil, gp.m = nil
    lock(&sched.lock)
    globrunqput(gp)     // 把gp放入sched的全局运行队列runq
    unlock(&sched.lock)

    schedule()          // 进入新一轮调度
}

系统调用

系统调用也会触发运行时调度器的调度,为了处理特殊的系统调用,我们甚至在 Goroutine 中加入了 _Gsyscall 状态,Go 语言通过 syscall.Syscallsyscall.RawSyscall 等使用汇编语言编写的方法封装操作系统提供的所有系统调用,其中 syscall.Syscall 的实现如下:

TEXT ·Syscall(SB),NOSPLIT,$0-28
    CALL	runtime·entersyscall(SB)
    ...
    CALL	runtime·exitsyscall(SB)
    RET

exitsyscall() 里会触发 schedule() 函数进行调度

2. 调度策略

schedule() 函数是调度逻辑的核心部分,该函数会暂停当前 G,保存现场、取下个 G 执行。

总体思路是:依次从 P 的本地 G 队列取、全局 G 队列取、其他 P 里偷 G

  1. 调度器每调度 61 次,从全局队列里取一次 G(以避免全局队列里有的 G 始终不被取出、饿死)

  2. 调用 runqget 函数从 P 本地的运行队列中查找待执行的 G

  3. 调用 findrunnable 函数从其他地方取 G(依次从本地队列、全局队列、netpoll、从其他 P 里偷窃取 G

    • 先再次尝试从本地队列获取 G

    • 去全局队列获取(因为前面仅仅是1/61的概率)

    • 执行 netpoll,检查网络读写事件,判断是否有 IO 就绪的 G

    • 如果还是没有,那么随机选择一个 P,偷其 runqueue 里的一半到自己的队列里

      • 这里的随机用到了一种质数算法,保证既随机,每个 P 又都能被访问到

      • 偷窃前会将 M 的自旋状态设为 true,偷窃后再改回去

      • 如果多次尝试偷 P 都失败了,M 会把 P 放回 sched 的 空闲 P 数组,自身休眠(放回空闲 M 列表)

  4. wakep: 另一种情况是,M 太忙了,如果 P 池子里有空闲的 P,会唤醒其他 sleep 状态的 M 一起干活。如果没有 sleep 状态的 Mruntime 会新建一个 M

  5. execute,执行代码。

schedule

// runtime/proc.go
// 找出一个 G 来执行
func schedule() {

top:

    // 检查定时器
    checkTimers(pp, 0)
   
    // 每调度 61次,从全局队列里取 G。防止全局队列里的 G 一直不被调度而饿死,保证公平性
    if gp == nil {
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock) // 全局队列需要加锁
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }

    // 从本地队列里取 G,优先使用 P.runnext 字段
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
    }
    
    // 通过 findrunnable 函数从其他可能得地方寻找可执行 G,具体见下
    if gp == nil {
        gp, inheritTime = findrunnable() 
    }
    
  
    // 执行任务函数 
    execute(gp, inheritTime)
}

findrunnable

依次从 本地队列、全局队列、netpoll、其他 P 获取可运行的 G。偷窃优先级最低,因为会影响其他 P 执行(需要加锁)。

// runtime/proc.go
// 寻找可执行的 G
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    now, pollUntil, _ := checkTimers(_p_, 0) // 检查 P 中的定时器,参考 Timer 的实现那篇文章
    
    // runqget 从本地队列取
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // globrunqget 从全局队列里取
    if sched.runqsize != 0 {
        lock(&sched.lock)          // 全局队列需要加锁        
        gp := globrunqget(_p_, 0)  // 从全局队列队头弹出;另外会分一部分 G 给当前 P
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }
  
    // 检查是否有就绪的 netpoller
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        // 略
    }

    // stealWork 从其他 P 偷窃
    if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
        gp, inheritTime, tnow, w, newWork := stealWork(now)
    }
    
    ...
    // 如果实在找不到 G,将 P 放回 sched.pidle 空闲队列,休眠 M
    pidleput(_p_)
    stopm()
}

globrunqget

从全局队列里取可用的 G

除返回一个可用 G 外,还会批量转移一批 GP 的本地队列(按 P 的数量均分,最多取 P 本地队列的一半)。这样就省的频繁加锁来全局队列取。

// 从全局队列列取 G
// 另外还会分一部分 G 给 P(分的 G 个数为 Min(全局队列长度/P个,或者P本地队列/2))
func globrunqget(_p_ *p, max int32) *g {
    if sched.runqsize == 0 {
        return nil
    }

    // 分给 P 的 G 个数为(全局队列长度/P个数,P本地队列长度/2,二者最小值)
    // 将全局队列按 P 个数等分
    n := sched.runqsize/gomaxprocs + 1
    if n > sched.runqsize {
        n = sched.runqsize
    }
    // 不能超过 runq 数组长度的一半
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n    // 调整计数

    gp := sched.runq.pop() // 从全局队列头部弹出 G
    n--
    for ; n > 0; n-- {     // 分 n 个 G 到当前 P 的本地队列
        gp1 := sched.runq.pop()
        runqput(_p_, gp1, false)
    }
    return gp
}

stealWork

从其他 P 里偷一半的 G 过来

func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
    pp := getg().m.p.ptr()
    
    const stealTries = 4
    for i := 0; i < stealTries; i++ {
        // 这里使用了一种质数算法,保证每个 P 都能被随机到
        // 随机一个起点,然后选取一个和长度互质的数,遍历时下标一直+这个质数并取模长度,就能不重复地遍历所有的元素
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            p2 := allp[enum.position()]
            
            // 这里也会偷窃 p2 的定时器
            tnow, w, ran := checkTimers(p2, now)
            
            // 偷窃 p2 的 G
            if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
                return gp, false, now, pollUntil, ranTimer
            }
        }
    }
}

// 调用 runqgrab
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
}

// 从 P 里偷窃 G
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        // 先判断要偷的 P 本地队列长度
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
        n := t - h
        // n 为要偷的长度,偷目标队列的一半
        n = n - n/2
        
        // 如果长度为0,看看它的 runnext 字段里有没有 G
        if n == 0 {
            if stealRunNextG {
                if next := _p_.runnext; next != 0 {
                    if !_p_.runnext.cas(next, 0) {
                        continue
                    }
                    batch[batchHead%uint32(len(batch))] = next
                    return 1
                }
            }
            return 0
        }
        
        if n > uint32(len(_p_.runq)/2) { // n 大于 P 长度的一半,说明该 P 的本地队列正在急剧增长,重试
            continue
        }

        // 偷一半 G 放到 batch 切片里
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
            batch[(batchHead+i)%uint32(len(batch))] = g
        }
        
        // 修改 P 本地队列的头指针
        if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume 
             return n 
        }
    }
}

调度循环

schedule() 调度中会通过 excute() 执行函数,函数执行完毕后在退出函数 goexit0() 再次触发调度,这样就完成了一次循环:

schedule() -> execute() -> gogo() -> g2() -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()

问题:上述调度可能会进行无数次循环,是否会耗尽栈空间?

schedule() 函数开始之后的一系列函数永远都不会返回,每次执行 mcall 会切换到 g0 栈运行,会切换到 g0.sched.sp 所指的固定位置,即重用这些函数上一轮调度时所使用过的栈内存,因此不会耗尽空间。

优点

  • 调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换

  • 不需要频繁的进行内存的分配与释放。在用户态维护着一块大的内存池, 不直接调用系统的 malloc 函数(除非内存池需要改变)

  • 另一方面充分利用了多核的硬件资源,近似的把若干 goroutine 均分在物理线程上

  • 再加上本身 goroutine 的超轻量,以上种种保证了 go 调度方面的性能

参考

Go 程序员面试笔试宝典 - 主 goroutine 如何创建

张方波 - 第三章 Goroutine调度策略(16)

Last updated