requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
type WaitGroup struct {
noCopy noCopy // 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝
state1 uint64 // 实际为2个 uint32 计数器,调用 Add() 方法时修改第一个计数器,调用 Wait() 方法时增加第二个计数器,表示阻塞的协程数
state2 uint32 // 信号量
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32) // 将传入的 delta 参数加到高32位,即第一个计数器中。由于结构体里没有锁,因此使用 atomic
v := int32(state >> 32) // 取高32位,第一个计数器
w := uint32(state) // 转为uint32取低32位,第二个计数器,表示阻塞的协程数
...
// 计数器1 >0,或者没有协程调用 Wait() 方法时,可直接返回
if v > 0 || w == 0 {
return
}
// 计数器1 ==0 且计数器2 >0,唤醒睡眠的各个协程
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 取高32位,第一个计数器
// 计数器1 == 0时,可直接返回
if v == 0 {
return
}
// 否则给计数器2自增并睡眠
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 陷入睡眠
runtime_Semacquire(semap)
return
}
}
}