diva-notes
  • README
  • Ads
    • 定价策略
    • 广告层级
    • 归因模型
    • 买量
    • Chat GPT
    • Google
  • AI
    • 参考资料
    • Chat GPT
    • stable-diffusion-webui安装
  • Algorithm
    • 倍增
    • 并查集
    • 参考
    • 环的判断
    • 凸包
    • 蓄水池抽样
    • 最短路径
    • 最小生成树
    • KMP算法
    • Rabin-Karp算法
    • Tarjan桥算法
  • Architecture
    • Serverless
  • Career
  • CICD
    • 代码质量
    • CICD实践
  • Data Structure
    • 布谷鸟过滤器
    • 布隆过滤器
    • 浮点
    • 红黑树
    • 锁
    • LSM树
  • DB
    • My SQL
      • 隔离级别
      • 架构
      • 索引
      • 锁
      • 页结构
      • 主从同步
      • ACID
      • Log
      • MVCC
      • Questions
    • Postgres
      • 持久化
      • 对比MySQL
      • 隔离级别
      • 索引
      • Greenpulm
      • MVCC
    • 倒排索引
    • 列式存储
    • H Base
    • HDFS
    • MPP数据库选型
    • Questions
  • Distributed System
    • 分布式事务
    • 服务网格
    • BASE理论
    • CAP
    • Etcd
    • Raft协议
    • ZAB协议
  • Go
    • 1.语言基础
      • 1.CPU寄存器
      • 2-1.函数调用
      • 2-2.函数调用栈
      • 2.接口
      • 3.汇编
      • 4.调试
    • 2.编译
      • 1.编译
      • 2.词法与语法分析
      • 3.类型检查
      • 4.中间代码生成
      • 5.机器码生成
    • 3.数据结构
      • 1.数组array
      • 2.切片slice
      • 3.哈希表map
      • 4.字符串
    • 4.常用关键字
      • 1.循环
      • 2.defer
      • 3.panic和recover
      • 4.make和new
    • 5.并发编程
      • 1.上下文Context的实现
      • 2-1.runtime.sema信号量
      • 2-2.sync.Mutex的实现
      • 2-3.sync.WaitGroup
      • 2-4.sync.Once的实现
      • 2-5.sync.Map的实现
      • 2-6.sync.Cond
      • 2-7.sync.Pool的实现
      • 2-8.sync.Semaphore的实现
      • 2-9.sync.ErrGroup
      • 3.定时器Timer的实现
      • 4.Channel的实现
      • 5-1.调度-线程
      • 5-2.调度-MPG
      • 5-3.调度-程序及调度启动
      • 5-4.调度-调度策略
      • 5-5.调度-抢占
      • 6.netpoll实现
      • 7.atomic
    • 6.内存管理
      • 1-1.内存分配基础-TCmalloc
      • 1-2.内存分配
      • 2.垃圾回收
      • 3.栈内存管理
    • 参考
    • 各版本特性
    • 坑
    • Go程序性能优化
    • http.Client
    • net.http路由
    • profile采样的实现
    • Questions
    • time的设计
  • Kafka
    • 高可用
    • 架构
    • 消息队列选型
    • ISR
    • Questions
  • Network
    • ARP
    • DNS
    • DPVS
    • GET和POST
    • HTTP 2
    • HTTP 3
    • HTTPS
    • LVS的转发模式
    • NAT
    • Nginx
    • OSI七层模型
    • Protobuf
    • Questions
    • REST Ful
    • RPC
    • socket缓冲区
    • socket详解
    • TCP滑动窗口
    • TCP连接建立源码
    • TCP连接四元组
    • TCP三次握手
    • TCP数据结构
    • TCP四次挥手
    • TCP拥塞控制
    • TCP重传机制
    • UDP
  • OS
    • 磁盘IO
    • 调度
    • 进程VS线程
    • 零拷贝
    • 内存-虚拟内存
    • 内存分配
    • 用户态VS内核态
    • 中断
    • COW写时复制
    • IO多路复用
    • Questions
  • Redis
    • 安装
    • 参考
    • 高可用-持久化
    • 高可用-主从同步
    • 高可用-Cluster
    • 高可用-Sentinel
    • 缓存一致性
    • 事务
    • 数据结构-SDS
    • 数据结构-Skiplist
    • 数据结构-Ziplist
    • 数据结构
    • 数据类型-Hashtable
    • 数据类型-List
    • 数据类型-Set
    • 数据类型-Zset
    • 数据淘汰机制
    • 通信协议-RESP
    • Questions
    • Redis6.0多线程
    • Redis分布式锁
    • Redis分片
  • System Design
    • 本地缓存
    • 错误处理
    • 大文件处理
    • 点赞收藏关注
    • 短链接生成系统
    • 负载均衡
    • 高并发高可用
    • 规则引擎
    • 集卡活动
    • 秒杀系统
    • 评论系统
    • 熔断
    • 限流
    • 延迟队列
    • Docker
    • ES
    • K 8 S
    • Node.js
    • Questions
  • Work
    • Bash
    • Charles
    • Code Review
    • Ffmpeg
    • Git
    • intellij插件
    • I Term 2
    • Mac
    • mysql命令
    • Nginx
    • postgresql命令
    • Protoc
    • Ssh
    • Systemd
    • Tcp相关命令
    • Vim
Powered by GitBook
On this page
  • 强制抢占
  • Go 1.2 协作式调度 / 请求式抢占
  • Go 1.14 信号式抢占
  • 调度时机
  • 源码
  • sysmon
  • 发送调度信号
  • 接收调度信号
  • 收到信号后的处理
  1. Go
  2. 5.并发编程

5-5.调度-抢占

单核CPU,开两个 goroutine,其中一个死循环,会怎么样?

答案是:死循环的 goroutine 卡住了,但是完全不影响另一个 goroutine 和主 goroutine 的运行。

go1.14 版本实现了基于信号的抢占式调度,可以在 goroutine 执行时间过长的时候强制让出调度,执行其他的 goroutine。接下来看看具体怎么实现的。基于 go1.15 linux amd64。

先看一个常规的例子

func main() {
    // 只有一个 P 了,目前运行主 goroutine
    runtime.GOMAXPROCS(1)

    // 创建一个 G1 , runtime.newproc -> runqput(_p_, newg, true) 放入到 P.runnext 字段
    go f1()

    // 因为只有一个 P 主 goroutine继续运行

    // 创建一个 G2 , 同上面,也是加入到队列头部,这时候本地队列的顺序就是 G2 G1
    go f2()

    // 等待 f1 f2的执行, gopark 主 goroutine, GMP 调度可运行的 G
    // 按顺序调用 G2 G1
    // 所以不管执行多少次,结果都是
    // This is f2
    // This is f1
    // success
    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

如果将 runtime.GOMAXPROCS(1) 改成 runtime.GOMAXPROCS(4) 即多核CPU,你就会发现 f1 和 f2 交替执行,没有明确的先后。因为此时有4个 P,两个 M 可以分别运行自己的 P 并行执行 f1 和 f2。

既然每次都是 f2 先执行,那在 f2 中加入一个死循环会怎么样呢?

func f1() {
    fmt.Println("This is f1")
}

func f2() {
    // 死循环
    for {

    }
    fmt.Println("This is f2")
}

func main() {
    runtime.GOMAXPROCS(1)

    go f1()
    go f2()

    time.Sleep(100 * time.Millisecond)
    fmt.Println("success")
}

// This is f1
// success

这段代码在 go 1.14 中可以正常执行,但 go 1.13 中则会卡住。因为之前版本在执行栈容量检查时才会检查是否需要抢占,对于 f2 里这种简单的 for 循环不会触发栈容量检查,就会导致不触发抢占。

强制抢占

操作系统使用的是强制抢占,即线程执行超过一定时间后,不管是否执行完,强制换上别的线程,以实现公平

Go 1.2 协作式调度 / 请求式抢占

Go 的抢占调度实现原理就是给 G 里放了一个标记字段,标记它是否可以被抢占。执行前检查这个字段,如果为 true,就表示可以暂停它,让出当前 CPU 给别的协程

  • Go 调度在 go1.2 实现了抢占,应该更精确的称为请求式抢占,那是因为 Go 调度器的抢占和 OS 的线程抢占比起来很柔和,不暴力,不会说线程时间片到了,或者更高优先级的任务到了,强制暂停当前任务给其他的任务。

  • 抢占请求需要满足 2个条件中的1个:

    1. G进行系统调用超过20us

    2. G运行超过10ms。

  • 调度器在启动的时候会启动一个单独的线程 sysmon,它负责所有的监控工作,其中1项就是抢占,发现满足抢占条件的 G 时,就发出抢占请求。

实现机制

  • 编译阶段,编译器会在一些有明显消耗的函数头部插入一些栈增长检测代码,用来做扩栈和调度相关的判断。

  • 函数执行时,通过检测 g.stackguard0 == 0,来判断是否要扩栈。该标志也用来判断是否该做调度,如果 g.stackguard0 == stackPreempt,则执行调度。(在 Go1.14 里使用 g.preempt 字段代替)

    • 调度时还会检查 gcwating 标志,如果 gcwating == 1, (表示 GC 正在等待执行 STW 操作),便会让出当前 P

  • 因此,对于非函数调用 (例如某个 G 执行的是简单 for 循环),即使设置了抢占标志,该 G 也不会执行栈扩容检测,就会一直霸占 M ,无法完成调度

Go 1.14 信号式抢占

  • 程序启动时,会注册函数,监听 SIGURG 信号并绑定处理方法 runtime.doSigPreempt

  • 需要调度时,发送 SIGURG 信号

  • 监听函数收到 SIGURG 信号,执行调度 schedule()

调度时机

  1. Go 后台监控 runtime.sysmon 检测超时发送抢占信号

  2. Go GC 栈扫描发送抢占信号

    • 对 GC Root 进行标记的时候会扫描 G 的栈,扫描之前会调用 suspendG 挂起 G 的执行才进行扫描

    • 会将处于运行状态的 G 的 preemptStop 标记成为 true,并调用 runtime.preemptM 会调用 runtime.signalM 向线程发送信号 SIGURG

    • 扫描完毕之后再次调用 resumeG 恢复执行。

  3. Go GC STW 的时候调用 preemptall 抢占所有 P,让其暂停

源码

sysmon

调度器在启动的时候会启动一个单独的后台监控线程 sysmon,它负责所有的监控工作,其中一项就是抢占,发现满足抢占条件的 G 时,就发出抢占请求。

sysmon 每隔段时间就会检查一次,间隔时间从 20us 递增,最多 10ms。成功调度后会恢复回 20us。

如果 sysmon 发现一个协程执行超过 10ms 了,就会发出抢占信号 SIGURG。

此外,程序在启动时会启动函数监听 SIGURG。收到这个信号后,执行 runtime.schedule() 函数触发调度。

func sysmon() {
    //无限循环
    for {
        //渐进式休眠,最短20us,最长10ms
        if idle == 0 { 
            delay = 20
        } else if idle > 50 { //前50轮休眠时间都是20us,50轮后double
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay) //休眠
        
        // retake 对所有的 G 进行重整
        if retake(now) != 0 {
            idle = 0 //retake成功则重置轮数
        } else {
            idle++ //否则轮数+1
        }
    }
}

//retake 对所有的 G 进行重整,发起抢占
func retake(now int64) uint32 {
    //加锁
    lock(&allpLock)

    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall { 
            // P 处于系统调用之中,抢占检查条件比较复杂,略
            // 与下面的区别是,syscall 本就是通过 g0 无 P 执行的,因此这个 P 此时挂在这个 M 上是浪费的,需要把这个 P 给别的 M
        } else if s == _Prunning { // P处于运行状态,检查其是否运行得太久了
            // forcePreemptNS = 10ms,如果协程独占P超过10ms,就需要进行抢占了
            if pd.schedwhen+forcePreemptNS <= now {
                preemptone(_p_)    // 修改 G 的标记为可被抢占
                sysretake = true
            }
        }
    }
}

发送调度信号

preemptone() 里修改了 G 的 preempt 字段为可被抢占,然后调用 signalM 发出信号量

//抢占
func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    gp := mp.curg
    
    // 标记 G 的抢占标识为 true
    // (在1.14之前的版本协作式抢占中,这个标记设置好了,就只能干等着,直到 morestask (函数调用)或者gopark(IO操作,例如channel之类)才会检查)
    gp.preempt = true

    // 1.14及之后,会直接调用 preemptM 信号通知线程,即所谓信号式抢占
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        preemptM(mp)
    }
}

//发出抢占信号
func preemptM(mp *m) {
    // CAS保证并发安全
    if atomic.Cas(&mp.signalPending, 0, 1) {
        //发出信号
        //const sigPreempt = _SIGURG,SIGURG是Linux下几乎没用的一个信号量,被go拿来当作抢占信号了
        signalM(mp, sigPreempt)
    }
}

接收调度信号

程序初始化时,会注册函数监听 SIGURG 信号

//程序初始化时,监听SIGURG信号
//src/runtime/proc.go
func mstartm0() {
    initsig(false)
}

//src/runtime/signal_unix.go
func initsig(preinit bool) {
    // 对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
    setsig(i, funcPC(sighandler))
}

//runtime/preempt.go
//这个函数里包含了对各种信号量的处理,其中就包括SIGURG
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    if sig == sigPreempt && debug.asyncpreemptoff == 0 {
        doSigPreempt(gp, c)
    }
}

func doSigPreempt(gp *g, ctxt *sigctxt) {
    // 判断异步安全点
    if wantAsyncPreempt(gp) {
        if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
            // Adjust the PC and inject a call to asyncPreempt.
            ctxt.pushCall(funcPC(asyncPreempt), newpc)
        }
    }

    // Acknowledge the preemption.
    atomic.Xadd(&gp.m.preemptGen, 1)
    atomic.Store(&gp.m.signalPending, 0)
}

// asyncPreempt saves all user registers and calls asyncPreempt2.
// When stack scanning encounters an asyncPreempt frame, it scans that
// frame and its parent frame conservatively.
// 这个函数是用汇编实现的,实际调用了asyncPreempt2
func asyncPreempt()

//go:nosplit
func asyncPreempt2() {
    gp := getg()
    gp.asyncSafePoint = true
    if gp.preemptStop {    // GC触发的抢占。GC时,会暂停所有G的抢占,所以叫 preemptStop。只有 GC 自己能抢占
        mcall(preemptPark) // 里面调用的 runtime.schedule()
    } else {
        mcall(gopreempt_m) // 里面调用的 goschedImpl,goschedImpl 调用的runtime.schedule()
    }
    gp.asyncSafePoint = false
}

收到信号后的处理

在 asyncPreempt2() 里,如果是 GC 触发的抢占,就执行 preemptPark,否则执行 gopreempt_m。里面最终都会调用 schedule()

preemptPark 方法会解绑 M 和 G 的关系,封存当前协程,执行 runtime.schedule() 来重新调度,获取可执行的协程,至于被抢占的协程后面会去重启。

// runtime/proc.go
// preemptPark parks gp and puts it in _Gpreempted.
func preemptPark(gp *g) {
    casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)     // 修改 G 的状态为抢占中
    dropg()                                                  // 解绑 M 和 G
    casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted) // 修改 G 的状态为被抢占
    schedule()                                               // 调度
}

// 解绑 M 和 G
func dropg() {
    _g_ := getg()

    setMNoWB(&_g_.m.curg.m, nil) // 将 G 的 M 设置为 nil
    setGNoWB(&_g_.m.curg, nil)   // 将 M 的 curg 设置为 nil
}

goschedImpl 操作就简单的多,把当前协程的状态从 _Grunning 正在执行改成 _Grunnable 可执行,使用 globrunqput 方法把抢占的协程放到全局队列里,根据 MPG 的协程调度设计,全局队列要后于本地队列被调度。

func gopreempt_m(gp *g) {
    goschedImpl(gp)
}

func goschedImpl(gp *g) {
    casgstatus(gp, _Grunning, _Grunnable) //修改G的状态为可运行
    dropg()                               //解绑M和G
    lock(&sched.lock)                     //加锁
    globrunqput(gp)                       //把g放入全局队列
    unlock(&sched.lock)                   //解锁
    schedule()                            //调度
}

参考

Previous5-4.调度-调度策略Next6.netpoll实现

Last updated 2 years ago

深度解密Go语言之 scheduler
单核CPU下Go语言调度及抢占式调度的实现
golang 异步抢占例子
抢占式调度
峰云就她了-go1.14基于信号的抢占式调度实现原理
查漏补缺之Goroutine的调度
朴素的心态-单核CPU下Go语言调度及抢占式调度的实现