# 5-4.调度-调度策略

调度需要解决的两个核心问题：

1. 调度时机：什么时候会发生调度？
2. 调度策略：使用什么策略来挑选下一个进入运行的 `goroutine`

## 1. 调度时机

* 新建 `M` 后，启动 `M` 的 `mstart()` 函数里触发 `schedule()`
* 新建 `G` 并执行后，协程执行完毕退出时的 `goexit0()` 函数里触发 `schedule()`
* `gopark`：`channel` 这种结构在等待读写时，会使用 `gopark()` 休眠协程后会触发 `schedule()`
* 阻塞性系统调用，比如文件 IO，网络IO
  * `Golang` 重写了所有系统调用，封装了 `syscall` ，在其前后增加了 `entersyscall` 和 `exitsyscall`，进入系统调用前暂停当前协程、完成系统调用后在 `exitsyscall()` 里恢复其资源、进度，并触发 `schedule()`
* GC 触发抢占：GC 需要 STW 暂停所有 `G` ，GC 时会发送抢占信号。注册函数收到信号后会执行 `preemptPark() -> schecule()`
* 其他抢占：注册函数收到抢占信号后执行 `gopreempt_m() -> goschedImpl() -> schedule()`
* 主动调用 `runtime.Gosched() -> goschedImpl()`，这个很少见，一般调试才用

#### gopark

该函数会调用 `mcall` 保存现场到 `G.sched` 字段，然后修改 `G` 状态为 `_Gwaiting`，并解绑 `G` 和 `M`，使之暂停运行。然后调用 `schedule()` 调度别的 `G` 起来运行

```go
// runtime/proc.go
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    mcall(park_m)
}

func park_m(gp *g) {
    _g_ := getg()

    casgstatus(gp, _Grunning, _Gwaiting) 
    dropg()     // 解绑 M 和 G

    schedule()  // 调度别的 G 起来运行
}

// runtime/asm_amd64.s
// 该函数会保存当前现场到 g.sched、然后切换到 g0 栈调用函数
TEXT runtime·mcall(SB), NOSPLIT, $0-4
```

#### goready

当通过 `gopark()` 休眠的协程满足条件、可以唤醒后（比如从 `channel` 中收到了数据），运行时会调用 `goready()` 唤醒协程

`goready` 会直接将 `G` 放回 `P.runnext`，使之以最高优先级恢复运行

```go
// runtime/proc.go
func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

func ready(gp *g, traceskip int, next bool) {
    _g_ := getg()

    casgstatus(gp, _Gwaiting, _Grunnable) // 修改 G 的状态
    runqput(_g_.m.p.ptr(), gp, next)      // 将 G 放回 P 的 runnext 字段
    
    // 有空闲的 P 而且没有正在偷窃 的 M（自旋的M），则需要唤醒或新建一个 M 出来运行这个空闲 P
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { 
        wakep()
    }
}

func wakep() {
    startm(nil, true) // 找一个 M 执行 P 的协程
}
```

#### goschedImpl

注册函数处理抢占信号 或 我们使用 `runtime.Gosched()` 主动暂停当前协程时都会调用 `goschedImpl` 函数。该函数逻辑和 `gopark` 差不多，只是比它多了一步 `globrunqput`，会将当前协程放入全局队列

```go
func goschedImpl(gp *g) {
    ...
    casgstatus(gp, _Grunning, _Grunnable)
    dropg()             // 设置当前m.curg = nil, gp.m = nil
    lock(&sched.lock)
    globrunqput(gp)     // 把gp放入sched的全局运行队列runq
    unlock(&sched.lock)

    schedule()          // 进入新一轮调度
}
```

#### 系统调用

系统调用也会触发运行时调度器的调度，为了处理特殊的系统调用，我们甚至在 Goroutine 中加入了 `_Gsyscall` 状态，Go 语言通过 [`syscall.Syscall`](https://draveness.me/golang/tree/syscall.Syscall) 和 [`syscall.RawSyscall`](https://draveness.me/golang/tree/syscall.RawSyscall) 等使用汇编语言编写的方法封装操作系统提供的所有系统调用，其中 [`syscall.Syscall`](https://draveness.me/golang/tree/syscall.Syscall) 的实现如下：

```assembly
TEXT ·Syscall(SB),NOSPLIT,$0-28
    CALL	runtime·entersyscall(SB)
    ...
    CALL	runtime·exitsyscall(SB)
    RET
```

在 `exitsyscall()` 里会触发 `schedule()` 函数进行调度

## 2. 调度策略

`schedule()` 函数是调度逻辑的核心部分，该函数会暂停当前 `G`，保存现场、取下个 `G` 执行。

总体思路是：依次从 `P` 的本地 `G` 队列取、全局 `G` 队列取、其他 `P` 里偷 `G`：

1. 调度器每调度 `61` 次，从全局队列里取一次 `G`（以避免全局队列里有的 `G` 始终不被取出、饿死）
2. 调用 `runqget` 函数从 `P` 本地的运行队列中查找待执行的 `G`
3. 调用 `findrunnable` 函数从其他地方取 `G`（依次从本地队列、全局队列、netpoll、从其他 `P` 里偷窃取 `G`）
   * 先再次尝试从本地队列获取 `G`
   * 去全局队列获取（因为前面仅仅是1/61的概率）
   * 执行 `netpoll`，检查网络读写事件，判断是否有 IO 就绪的 `G`
   * 如果还是没有，那么随机选择一个 `P`，偷其 `runqueue` 里的一半到自己的队列里
     * 这里的随机用到了一种质数算法，保证既随机，每个 `P` 又都能被访问到
     * 偷窃前会将 `M` 的自旋状态设为 `true`，偷窃后再改回去
     * 如果多次尝试偷 `P` 都失败了，`M` 会把 `P` 放回 `sched` 的 空闲 `P` 数组，自身休眠（放回空闲 `M` 列表）
4. `wakep`: 另一种情况是，`M` 太忙了，如果 `P` 池子里有空闲的 `P`，会唤醒其他 `sleep` 状态的 `M` 一起干活。如果没有 `sleep` 状态的 `M`，`runtime` 会新建一个 `M`。
5. `execute`，执行代码。

**schedule**

```go
// runtime/proc.go
// 找出一个 G 来执行
func schedule() {

top:

    // 检查定时器
    checkTimers(pp, 0)
   
    // 每调度 61次，从全局队列里取 G。防止全局队列里的 G 一直不被调度而饿死，保证公平性
    if gp == nil {
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock) // 全局队列需要加锁
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }

    // 从本地队列里取 G，优先使用 P.runnext 字段
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
    }
    
    // 通过 findrunnable 函数从其他可能得地方寻找可执行 G，具体见下
    if gp == nil {
        gp, inheritTime = findrunnable() 
    }
    
  
    // 执行任务函数 
    execute(gp, inheritTime)
}
```

**findrunnable**

依次从 本地队列、全局队列、netpoll、其他 `P` 获取可运行的 `G`。偷窃优先级最低，因为会影响其他 `P` 执行（需要加锁）。

```go
// runtime/proc.go
// 寻找可执行的 G
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    now, pollUntil, _ := checkTimers(_p_, 0) // 检查 P 中的定时器，参考 Timer 的实现那篇文章
    
    // runqget 从本地队列取
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // globrunqget 从全局队列里取
    if sched.runqsize != 0 {
        lock(&sched.lock)          // 全局队列需要加锁        
        gp := globrunqget(_p_, 0)  // 从全局队列队头弹出；另外会分一部分 G 给当前 P
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }
  
    // 检查是否有就绪的 netpoller
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        // 略
    }

    // stealWork 从其他 P 偷窃
    if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
        gp, inheritTime, tnow, w, newWork := stealWork(now)
    }
    
    ...
    // 如果实在找不到 G，将 P 放回 sched.pidle 空闲队列，休眠 M
    pidleput(_p_)
    stopm()
}
```

**globrunqget**

从全局队列里取可用的 `G`

除返回一个可用 `G` 外，还会批量转移一批 `G` 到 `P` 的本地队列（按 `P` 的数量均分，最多取 `P` 本地队列的一半）。这样就省的频繁加锁来全局队列取。

```go
// 从全局队列列取 G
// 另外还会分一部分 G 给 P（分的 G 个数为 Min（全局队列长度/P个，或者P本地队列/2））
func globrunqget(_p_ *p, max int32) *g {
    if sched.runqsize == 0 {
        return nil
    }

    // 分给 P 的 G 个数为（全局队列长度/P个数，P本地队列长度/2，二者最小值）
    // 将全局队列按 P 个数等分
    n := sched.runqsize/gomaxprocs + 1
    if n > sched.runqsize {
        n = sched.runqsize
    }
    // 不能超过 runq 数组长度的一半
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n    // 调整计数

    gp := sched.runq.pop() // 从全局队列头部弹出 G
    n--
    for ; n > 0; n-- {     // 分 n 个 G 到当前 P 的本地队列
        gp1 := sched.runq.pop()
        runqput(_p_, gp1, false)
    }
    return gp
}
```

**stealWork**

从其他 `P` 里偷一半的 `G` 过来

```go
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
    pp := getg().m.p.ptr()
    
    const stealTries = 4
    for i := 0; i < stealTries; i++ {
        // 这里使用了一种质数算法，保证每个 P 都能被随机到
        // 随机一个起点，然后选取一个和长度互质的数，遍历时下标一直+这个质数并取模长度，就能不重复地遍历所有的元素
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            p2 := allp[enum.position()]
            
            // 这里也会偷窃 p2 的定时器
            tnow, w, ran := checkTimers(p2, now)
            
            // 偷窃 p2 的 G
            if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
                return gp, false, now, pollUntil, ranTimer
            }
        }
    }
}

// 调用 runqgrab
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
}

// 从 P 里偷窃 G
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        // 先判断要偷的 P 本地队列长度
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
        n := t - h
        // n 为要偷的长度，偷目标队列的一半
        n = n - n/2
        
        // 如果长度为0，看看它的 runnext 字段里有没有 G
        if n == 0 {
            if stealRunNextG {
                if next := _p_.runnext; next != 0 {
                    if !_p_.runnext.cas(next, 0) {
                        continue
                    }
                    batch[batchHead%uint32(len(batch))] = next
                    return 1
                }
            }
            return 0
        }
        
        if n > uint32(len(_p_.runq)/2) { // n 大于 P 长度的一半，说明该 P 的本地队列正在急剧增长，重试
            continue
        }

        // 偷一半 G 放到 batch 切片里
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
            batch[(batchHead+i)%uint32(len(batch))] = g
        }
        
        // 修改 P 本地队列的头指针
        if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume 
             return n 
        }
    }
}
```

#### 调度循环

`schedule()` 调度中会通过 `excute()` 执行函数，函数执行完毕后在退出函数 `goexit0()` 再次触发调度，这样就完成了一次循环：

`schedule() -> execute() -> gogo() -> g2() -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()`

**问题：上述调度可能会进行无数次循环，是否会耗尽栈空间？**

`schedule()` 函数开始之后的一系列函数永远都不会返回，每次执行 `mcall` 会切换到 `g0` 栈运行，会切换到 `g0.sched.sp` 所指的固定位置，即重用这些函数上一轮调度时所使用过的栈内存，因此不会耗尽空间。

## 优点

* 调度是在用户态下完成的， 不涉及内核态与用户态之间的频繁切换
* 不需要频繁的进行内存的分配与释放。在用户态维护着一块大的内存池， 不直接调用系统的 `malloc` 函数（除非内存池需要改变）
* 另一方面充分利用了多核的硬件资源，近似的把若干 `goroutine` 均分在物理线程上
* 再加上本身 `goroutine` 的超轻量，以上种种保证了 `go` 调度方面的性能

## 参考

[Go 程序员面试笔试宝典 - 主 goroutine 如何创建](https://golang.design/go-questions/sched/main-goroutine/)

[张方波 - 第三章 Goroutine调度策略（16）](https://mp.weixin.qq.com/s/2objs5JrlnKnwFbF4a2z2g)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://wtifs.gitbook.io/diva-notes/go/5.-bing-fa-bian-cheng/54.-tiao-du-tiao-du-ce-lve.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
