TEXT ·Syscall(SB),NOSPLIT,$0-28
CALL runtime·entersyscall(SB)
...
CALL runtime·exitsyscall(SB)
RET
在 exitsyscall() 里会触发 schedule() 函数进行调度
2. 调度策略
schedule() 函数是调度逻辑的核心部分,该函数会暂停当前 G,保存现场、取下个 G 执行。
总体思路是:依次从 P 的本地 G 队列取、全局 G 队列取、其他 P 里偷 G:
调度器每调度 61 次,从全局队列里取一次 G(以避免全局队列里有的 G 始终不被取出、饿死)
调用 runqget 函数从 P 本地的运行队列中查找待执行的 G
调用 findrunnable 函数从其他地方取 G(依次从本地队列、全局队列、netpoll、从其他 P 里偷窃取 G)
先再次尝试从本地队列获取 G
去全局队列获取(因为前面仅仅是1/61的概率)
执行 netpoll,检查网络读写事件,判断是否有 IO 就绪的 G
如果还是没有,那么随机选择一个 P,偷其 runqueue 里的一半到自己的队列里
这里的随机用到了一种质数算法,保证既随机,每个 P 又都能被访问到
偷窃前会将 M 的自旋状态设为 true,偷窃后再改回去
如果多次尝试偷 P 都失败了,M 会把 P 放回 sched 的 空闲 P 数组,自身休眠(放回空闲 M 列表)
wakep: 另一种情况是,M 太忙了,如果 P 池子里有空闲的 P,会唤醒其他 sleep 状态的 M 一起干活。如果没有 sleep 状态的 M,runtime 会新建一个 M。
execute,执行代码。
schedule
// runtime/proc.go
// 找出一个 G 来执行
func schedule() {
top:
// 检查定时器
checkTimers(pp, 0)
// 每调度 61次,从全局队列里取 G。防止全局队列里的 G 一直不被调度而饿死,保证公平性
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock) // 全局队列需要加锁
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从本地队列里取 G,优先使用 P.runnext 字段
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
// 通过 findrunnable 函数从其他可能得地方寻找可执行 G,具体见下
if gp == nil {
gp, inheritTime = findrunnable()
}
// 执行任务函数
execute(gp, inheritTime)
}
findrunnable
依次从 本地队列、全局队列、netpoll、其他 P 获取可运行的 G。偷窃优先级最低,因为会影响其他 P 执行(需要加锁)。
// runtime/proc.go
// 寻找可执行的 G
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
now, pollUntil, _ := checkTimers(_p_, 0) // 检查 P 中的定时器,参考 Timer 的实现那篇文章
// runqget 从本地队列取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// globrunqget 从全局队列里取
if sched.runqsize != 0 {
lock(&sched.lock) // 全局队列需要加锁
gp := globrunqget(_p_, 0) // 从全局队列队头弹出;另外会分一部分 G 给当前 P
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 检查是否有就绪的 netpoller
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 略
}
// stealWork 从其他 P 偷窃
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
gp, inheritTime, tnow, w, newWork := stealWork(now)
}
...
// 如果实在找不到 G,将 P 放回 sched.pidle 空闲队列,休眠 M
pidleput(_p_)
stopm()
}
globrunqget
从全局队列里取可用的 G
除返回一个可用 G 外,还会批量转移一批 G 到 P 的本地队列(按 P 的数量均分,最多取 P 本地队列的一半)。这样就省的频繁加锁来全局队列取。
// 从全局队列列取 G
// 另外还会分一部分 G 给 P(分的 G 个数为 Min(全局队列长度/P个,或者P本地队列/2))
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
// 分给 P 的 G 个数为(全局队列长度/P个数,P本地队列长度/2,二者最小值)
// 将全局队列按 P 个数等分
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
// 不能超过 runq 数组长度的一半
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n // 调整计数
gp := sched.runq.pop() // 从全局队列头部弹出 G
n--
for ; n > 0; n-- { // 分 n 个 G 到当前 P 的本地队列
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
stealWork
从其他 P 里偷一半的 G 过来
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg().m.p.ptr()
const stealTries = 4
for i := 0; i < stealTries; i++ {
// 这里使用了一种质数算法,保证每个 P 都能被随机到
// 随机一个起点,然后选取一个和长度互质的数,遍历时下标一直+这个质数并取模长度,就能不重复地遍历所有的元素
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
p2 := allp[enum.position()]
// 这里也会偷窃 p2 的定时器
tnow, w, ran := checkTimers(p2, now)
// 偷窃 p2 的 G
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
return gp, false, now, pollUntil, ranTimer
}
}
}
}
// 调用 runqgrab
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
}
// 从 P 里偷窃 G
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 先判断要偷的 P 本地队列长度
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
// n 为要偷的长度,偷目标队列的一半
n = n - n/2
// 如果长度为0,看看它的 runnext 字段里有没有 G
if n == 0 {
if stealRunNextG {
if next := _p_.runnext; next != 0 {
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(_p_.runq)/2) { // n 大于 P 长度的一半,说明该 P 的本地队列正在急剧增长,重试
continue
}
// 偷一半 G 放到 batch 切片里
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 修改 P 本地队列的头指针
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}