TEXT ·Syscall(SB),NOSPLIT,$0-28
CALL runtime·entersyscall(SB)
...
CALL runtime·exitsyscall(SB)
RET
在 exitsyscall() 里会触发 schedule() 函数进行调度
2. 调度策略
schedule() 函数是调度逻辑的核心部分,该函数会暂停当前 G,保存现场、取下个 G 执行。
总体思路是:依次从 P 的本地 G 队列取、全局 G 队列取、其他 P 里偷 G:
调度器每调度 61 次,从全局队列里取一次 G(以避免全局队列里有的 G 始终不被取出、饿死)
调用 runqget 函数从 P 本地的运行队列中查找待执行的 G
调用 findrunnable 函数从其他地方取 G(依次从本地队列、全局队列、netpoll、从其他 P 里偷窃取 G)
先再次尝试从本地队列获取 G
去全局队列获取(因为前面仅仅是1/61的概率)
执行 netpoll,检查网络读写事件,判断是否有 IO 就绪的 G
如果还是没有,那么随机选择一个 P,偷其 runqueue 里的一半到自己的队列里
这里的随机用到了一种质数算法,保证既随机,每个 P 又都能被访问到
偷窃前会将 M 的自旋状态设为 true,偷窃后再改回去
如果多次尝试偷 P 都失败了,M 会把 P 放回 sched 的 空闲 P 数组,自身休眠(放回空闲 M 列表)
wakep: 另一种情况是,M 太忙了,如果 P 池子里有空闲的 P,会唤醒其他 sleep 状态的 M 一起干活。如果没有 sleep 状态的 M,runtime 会新建一个 M。
execute,执行代码。
schedule
// runtime/proc.go// 找出一个 G 来执行funcschedule() {top:// 检查定时器checkTimers(pp, 0)// 每调度 61次,从全局队列里取 G。防止全局队列里的 G 一直不被调度而饿死,保证公平性if gp ==nil {if _g_.m.p.ptr().schedtick%61==0&& sched.runqsize >0 {lock(&sched.lock) // 全局队列需要加锁 gp =globrunqget(_g_.m.p.ptr(), 1)unlock(&sched.lock) } }// 从本地队列里取 G,优先使用 P.runnext 字段if gp ==nil { gp, inheritTime =runqget(_g_.m.p.ptr()) }// 通过 findrunnable 函数从其他可能得地方寻找可执行 G,具体见下if gp ==nil { gp, inheritTime =findrunnable() }// 执行任务函数 execute(gp, inheritTime)}
findrunnable
依次从 本地队列、全局队列、netpoll、其他 P 获取可运行的 G。偷窃优先级最低,因为会影响其他 P 执行(需要加锁)。
// runtime/proc.go// 寻找可执行的 G// Tries to steal from other P's, get g from local or global queue, poll network.funcfindrunnable() (gp *g, inheritTime bool) { now, pollUntil, _ :=checkTimers(_p_, 0) // 检查 P 中的定时器,参考 Timer 的实现那篇文章// runqget 从本地队列取if gp, inheritTime :=runqget(_p_); gp !=nil {return gp, inheritTime }// globrunqget 从全局队列里取if sched.runqsize !=0 {lock(&sched.lock) // 全局队列需要加锁 gp :=globrunqget(_p_, 0) // 从全局队列队头弹出;另外会分一部分 G 给当前 Punlock(&sched.lock)if gp !=nil {return gp, false } }// 检查是否有就绪的 netpollerifnetpollinited() && atomic.Load(&netpollWaiters) >0&& atomic.Load64(&sched.lastpoll) !=0 {// 略 }// stealWork 从其他 P 偷窃if _g_.m.spinning ||2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { gp, inheritTime, tnow, w, newWork :=stealWork(now) }...// 如果实在找不到 G,将 P 放回 sched.pidle 空闲队列,休眠 Mpidleput(_p_)stopm()}
globrunqget
从全局队列里取可用的 G
除返回一个可用 G 外,还会批量转移一批 G 到 P 的本地队列(按 P 的数量均分,最多取 P 本地队列的一半)。这样就省的频繁加锁来全局队列取。
// 从全局队列列取 G// 另外还会分一部分 G 给 P(分的 G 个数为 Min(全局队列长度/P个,或者P本地队列/2))funcglobrunqget(_p_ *p, max int32) *g {if sched.runqsize ==0 {returnnil }// 分给 P 的 G 个数为(全局队列长度/P个数,P本地队列长度/2,二者最小值)// 将全局队列按 P 个数等分 n := sched.runqsize/gomaxprocs +1if n > sched.runqsize { n = sched.runqsize }// 不能超过 runq 数组长度的一半if n >int32(len(_p_.runq))/2 { n =int32(len(_p_.runq)) /2 } sched.runqsize -= n // 调整计数 gp := sched.runq.pop() // 从全局队列头部弹出 G n--for ; n >0; n-- { // 分 n 个 G 到当前 P 的本地队列 gp1 := sched.runq.pop()runqput(_p_, gp1, false) }return gp}
stealWork
从其他 P 里偷一半的 G 过来
funcstealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) { pp :=getg().m.p.ptr()const stealTries =4for i :=0; i < stealTries; i++ {// 这里使用了一种质数算法,保证每个 P 都能被随机到// 随机一个起点,然后选取一个和长度互质的数,遍历时下标一直+这个质数并取模长度,就能不重复地遍历所有的元素for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { p2 := allp[enum.position()]// 这里也会偷窃 p2 的定时器 tnow, w, ran :=checkTimers(p2, now)// 偷窃 p2 的 Gif gp :=runqsteal(pp, p2, stealTimersOrRunNextG); gp !=nil {return gp, false, now, pollUntil, ranTimer } } }}// 调用 runqgrabfuncrunqsteal(_p_, p2 *p, stealRunNextG bool) *g { n :=runqgrab(p2, &_p_.runq, t, stealRunNextG)}// 从 P 里偷窃 Gfuncrunqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {for {// 先判断要偷的 P 本地队列长度 h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer n := t - h// n 为要偷的长度,偷目标队列的一半 n = n - n/2// 如果长度为0,看看它的 runnext 字段里有没有 Gif n ==0 {if stealRunNextG {if next := _p_.runnext; next !=0 {if!_p_.runnext.cas(next, 0) {continue } batch[batchHead%uint32(len(batch))] = nextreturn1 } }return0 }if n >uint32(len(_p_.runq)/2) { // n 大于 P 长度的一半,说明该 P 的本地队列正在急剧增长,重试continue }// 偷一半 G 放到 batch 切片里for i :=uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g }// 修改 P 本地队列的头指针if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n } }}