TEXT ·Syscall(SB),NOSPLIT,$0-28
CALL runtime·entersyscall(SB)
...
CALL runtime·exitsyscall(SB)
RET
// runtime/proc.go
// 找出一个 G 来执行
func schedule() {
top:
// 检查定时器
checkTimers(pp, 0)
// 每调度 61次,从全局队列里取 G。防止全局队列里的 G 一直不被调度而饿死,保证公平性
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock) // 全局队列需要加锁
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从本地队列里取 G,优先使用 P.runnext 字段
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
// 通过 findrunnable 函数从其他可能得地方寻找可执行 G,具体见下
if gp == nil {
gp, inheritTime = findrunnable()
}
// 执行任务函数
execute(gp, inheritTime)
}
// runtime/proc.go
// 寻找可执行的 G
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
now, pollUntil, _ := checkTimers(_p_, 0) // 检查 P 中的定时器,参考 Timer 的实现那篇文章
// runqget 从本地队列取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// globrunqget 从全局队列里取
if sched.runqsize != 0 {
lock(&sched.lock) // 全局队列需要加锁
gp := globrunqget(_p_, 0) // 从全局队列队头弹出;另外会分一部分 G 给当前 P
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 检查是否有就绪的 netpoller
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 略
}
// stealWork 从其他 P 偷窃
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
gp, inheritTime, tnow, w, newWork := stealWork(now)
}
...
// 如果实在找不到 G,将 P 放回 sched.pidle 空闲队列,休眠 M
pidleput(_p_)
stopm()
}
// 从全局队列列取 G
// 另外还会分一部分 G 给 P(分的 G 个数为 Min(全局队列长度/P个,或者P本地队列/2))
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
// 分给 P 的 G 个数为(全局队列长度/P个数,P本地队列长度/2,二者最小值)
// 将全局队列按 P 个数等分
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
// 不能超过 runq 数组长度的一半
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n // 调整计数
gp := sched.runq.pop() // 从全局队列头部弹出 G
n--
for ; n > 0; n-- { // 分 n 个 G 到当前 P 的本地队列
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg().m.p.ptr()
const stealTries = 4
for i := 0; i < stealTries; i++ {
// 这里使用了一种质数算法,保证每个 P 都能被随机到
// 随机一个起点,然后选取一个和长度互质的数,遍历时下标一直+这个质数并取模长度,就能不重复地遍历所有的元素
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
p2 := allp[enum.position()]
// 这里也会偷窃 p2 的定时器
tnow, w, ran := checkTimers(p2, now)
// 偷窃 p2 的 G
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
return gp, false, now, pollUntil, ranTimer
}
}
}
}
// 调用 runqgrab
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
}
// 从 P 里偷窃 G
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 先判断要偷的 P 本地队列长度
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
// n 为要偷的长度,偷目标队列的一半
n = n - n/2
// 如果长度为0,看看它的 runnext 字段里有没有 G
if n == 0 {
if stealRunNextG {
if next := _p_.runnext; next != 0 {
if !_p_.runnext.cas(next, 0) {
continue
}
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
return 0
}
if n > uint32(len(_p_.runq)/2) { // n 大于 P 长度的一半,说明该 P 的本地队列正在急剧增长,重试
continue
}
// 偷一半 G 放到 batch 切片里
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 修改 P 本地队列的头指针
if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}