diva-notes
  • README
  • Ads
    • 定价策略
    • 广告层级
    • 归因模型
    • 买量
    • Chat GPT
    • Google
  • AI
    • 参考资料
    • Chat GPT
    • stable-diffusion-webui安装
  • Algorithm
    • 倍增
    • 并查集
    • 参考
    • 环的判断
    • 凸包
    • 蓄水池抽样
    • 最短路径
    • 最小生成树
    • KMP算法
    • Rabin-Karp算法
    • Tarjan桥算法
  • Architecture
    • Serverless
  • Career
  • CICD
    • 代码质量
    • CICD实践
  • Data Structure
    • 布谷鸟过滤器
    • 布隆过滤器
    • 浮点
    • 红黑树
    • 锁
    • LSM树
  • DB
    • My SQL
      • 隔离级别
      • 架构
      • 索引
      • 锁
      • 页结构
      • 主从同步
      • ACID
      • Log
      • MVCC
      • Questions
    • Postgres
      • 持久化
      • 对比MySQL
      • 隔离级别
      • 索引
      • Greenpulm
      • MVCC
    • 倒排索引
    • 列式存储
    • H Base
    • HDFS
    • MPP数据库选型
    • Questions
  • Distributed System
    • 分布式事务
    • 服务网格
    • BASE理论
    • CAP
    • Etcd
    • Raft协议
    • ZAB协议
  • Go
    • 1.语言基础
      • 1.CPU寄存器
      • 2-1.函数调用
      • 2-2.函数调用栈
      • 2.接口
      • 3.汇编
      • 4.调试
    • 2.编译
      • 1.编译
      • 2.词法与语法分析
      • 3.类型检查
      • 4.中间代码生成
      • 5.机器码生成
    • 3.数据结构
      • 1.数组array
      • 2.切片slice
      • 3.哈希表map
      • 4.字符串
    • 4.常用关键字
      • 1.循环
      • 2.defer
      • 3.panic和recover
      • 4.make和new
    • 5.并发编程
      • 1.上下文Context的实现
      • 2-1.runtime.sema信号量
      • 2-2.sync.Mutex的实现
      • 2-3.sync.WaitGroup
      • 2-4.sync.Once的实现
      • 2-5.sync.Map的实现
      • 2-6.sync.Cond
      • 2-7.sync.Pool的实现
      • 2-8.sync.Semaphore的实现
      • 2-9.sync.ErrGroup
      • 3.定时器Timer的实现
      • 4.Channel的实现
      • 5-1.调度-线程
      • 5-2.调度-MPG
      • 5-3.调度-程序及调度启动
      • 5-4.调度-调度策略
      • 5-5.调度-抢占
      • 6.netpoll实现
      • 7.atomic
    • 6.内存管理
      • 1-1.内存分配基础-TCmalloc
      • 1-2.内存分配
      • 2.垃圾回收
      • 3.栈内存管理
    • 参考
    • 各版本特性
    • 坑
    • Go程序性能优化
    • http.Client
    • net.http路由
    • profile采样的实现
    • Questions
    • time的设计
  • Kafka
    • 高可用
    • 架构
    • 消息队列选型
    • ISR
    • Questions
  • Network
    • ARP
    • DNS
    • DPVS
    • GET和POST
    • HTTP 2
    • HTTP 3
    • HTTPS
    • LVS的转发模式
    • NAT
    • Nginx
    • OSI七层模型
    • Protobuf
    • Questions
    • REST Ful
    • RPC
    • socket缓冲区
    • socket详解
    • TCP滑动窗口
    • TCP连接建立源码
    • TCP连接四元组
    • TCP三次握手
    • TCP数据结构
    • TCP四次挥手
    • TCP拥塞控制
    • TCP重传机制
    • UDP
  • OS
    • 磁盘IO
    • 调度
    • 进程VS线程
    • 零拷贝
    • 内存-虚拟内存
    • 内存分配
    • 用户态VS内核态
    • 中断
    • COW写时复制
    • IO多路复用
    • Questions
  • Redis
    • 安装
    • 参考
    • 高可用-持久化
    • 高可用-主从同步
    • 高可用-Cluster
    • 高可用-Sentinel
    • 缓存一致性
    • 事务
    • 数据结构-SDS
    • 数据结构-Skiplist
    • 数据结构-Ziplist
    • 数据结构
    • 数据类型-Hashtable
    • 数据类型-List
    • 数据类型-Set
    • 数据类型-Zset
    • 数据淘汰机制
    • 通信协议-RESP
    • Questions
    • Redis6.0多线程
    • Redis分布式锁
    • Redis分片
  • System Design
    • 本地缓存
    • 错误处理
    • 大文件处理
    • 点赞收藏关注
    • 短链接生成系统
    • 负载均衡
    • 高并发高可用
    • 规则引擎
    • 集卡活动
    • 秒杀系统
    • 评论系统
    • 熔断
    • 限流
    • 延迟队列
    • Docker
    • ES
    • K 8 S
    • Node.js
    • Questions
  • Work
    • Bash
    • Charles
    • Code Review
    • Ffmpeg
    • Git
    • intellij插件
    • I Term 2
    • Mac
    • mysql命令
    • Nginx
    • postgresql命令
    • Protoc
    • Ssh
    • Systemd
    • Tcp相关命令
    • Vim
Powered by GitBook
On this page
  • 结构体
  • 接口
  • 小结
  1. Go
  2. 5.并发编程

2-3.sync.WaitGroup

Previous2-2.sync.Mutex的实现Next2-4.sync.Once的实现

Last updated 2 years ago

可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

结构体

结构体中只包含两个成员变量,一个是计数器,一个是信号量,甚至连锁都没有

type WaitGroup struct {
    noCopy noCopy     // 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝
  state1 uint64     // 实际为2个 uint32 计数器,调用 Add() 方法时修改第一个计数器,调用 Wait() 方法时增加第二个计数器,表示阻塞的协程数
    state2 uint32     // 信号量
}

接口

Add() 方法做两件事:

  1. 增加或扣减计数器(Add(正数) 时增加,Done() 时扣减)

  2. 扣减计数器后检查计数器,在计数器扣减为 0 时唤醒所有睡眠的协程

func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state() 
    state := atomic.AddUint64(statep, uint64(delta)<<32) // 将传入的 delta 参数加到高32位,即第一个计数器中。由于结构体里没有锁,因此使用 atomic
    v := int32(state >> 32) // 取高32位,第一个计数器
    w := uint32(state)      // 转为uint32取低32位,第二个计数器,表示阻塞的协程数
    ...
    // 计数器1 >0,或者没有协程调用 Wait() 方法时,可直接返回
    if v > 0 || w == 0 {
        return
    }
  
    // 计数器1 ==0 且计数器2 >0,唤醒睡眠的各个协程
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

Wait() 方法只做一件事:增加计数器并让协程进入休眠

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32) // 取高32位,第一个计数器
        // 计数器1 == 0时,可直接返回
        if v == 0 {
            return
        }
        // 否则给计数器2自增并睡眠
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
        // 陷入睡眠
            runtime_Semacquire(semap)
            return
        }
    }
}

小结

是一个特殊的私有结构体, 包中的分析器会在编译期间检查被拷贝的变量中是否包含 或者实现了 Lock 和 Unlock 方法,如果包含该结构体或者实现了对应的方法就会报出以下错误:copies lock value: sync.WaitGroup

对外暴露了三个方法:、 和 。

因为其中的 只是向 方法传入了 -1,所以我们重点分析另外两个方法,即 和 。

只是对 方法的简单封装,我们可以向 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的协程

可以同时有多个协程等待当前 计数器的归零,这些 Goroutine 会被同时唤醒;

sync.WaitGroup
sync.WaitGroup
sync.noCopy
tools/go/analysis/passes/copylock
sync.noCopy
sync.WaitGroup
sync.WaitGroup.Add
sync.WaitGroup.Wait
sync.WaitGroup.Done
sync.WaitGroup.Done
sync.WaitGroup.Add
sync.WaitGroup.Add
sync.WaitGroup.Wait
sync.WaitGroup.Done
sync.WaitGroup.Add
sync.WaitGroup.Add
sync.WaitGroup