2-3.sync.WaitGroup
sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()结构体
sync.WaitGroup 结构体中只包含两个成员变量,一个是计数器,一个是信号量,甚至连锁都没有
type WaitGroup struct {
noCopy noCopy // 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝
state1 uint64 // 实际为2个 uint32 计数器,调用 Add() 方法时修改第一个计数器,调用 Wait() 方法时增加第二个计数器,表示阻塞的协程数
state2 uint32 // 信号量
}sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 Lock 和 Unlock 方法,如果包含该结构体或者实现了对应的方法就会报出以下错误:copies lock value: sync.WaitGroup
接口
sync.WaitGroup 对外暴露了三个方法:sync.WaitGroup.Add、sync.WaitGroup.Wait 和 sync.WaitGroup.Done。
因为其中的 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法传入了 -1,所以我们重点分析另外两个方法,即 sync.WaitGroup.Add 和 sync.WaitGroup.Wait。
Add() 方法做两件事:
增加或扣减计数器(
Add(正数)时增加,Done()时扣减)扣减计数器后检查计数器,在计数器扣减为
0时唤醒所有睡眠的协程
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32) // 将传入的 delta 参数加到高32位,即第一个计数器中。由于结构体里没有锁,因此使用 atomic
v := int32(state >> 32) // 取高32位,第一个计数器
w := uint32(state) // 转为uint32取低32位,第二个计数器,表示阻塞的协程数
...
// 计数器1 >0,或者没有协程调用 Wait() 方法时,可直接返回
if v > 0 || w == 0 {
return
}
// 计数器1 ==0 且计数器2 >0,唤醒睡眠的各个协程
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}Wait() 方法只做一件事:增加计数器并让协程进入休眠
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 取高32位,第一个计数器
// 计数器1 == 0时,可直接返回
if v == 0 {
return
}
// 否则给计数器2自增并睡眠
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 陷入睡眠
runtime_Semacquire(semap)
return
}
}
}小结
sync.WaitGroup.Done只是对sync.WaitGroup.Add方法的简单封装,我们可以向sync.WaitGroup.Add方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒等待的协程可以同时有多个协程等待当前
sync.WaitGroup计数器的归零,这些 Goroutine 会被同时唤醒;
Last updated