2-3.sync.WaitGroup

sync.WaitGroup 可以等待一组 Goroutine 的返回,一个比较常见的使用场景是批量发出 RPC 或者 HTTP 请求:

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

结构体

sync.WaitGroup 结构体中只包含两个成员变量,一个是计数器,一个是信号量,甚至连锁都没有

type WaitGroup struct {
    noCopy noCopy     // 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝
  state1 uint64     // 实际为2个 uint32 计数器,调用 Add() 方法时修改第一个计数器,调用 Wait() 方法时增加第二个计数器,表示阻塞的协程数
    state2 uint32     // 信号量
}

sync.noCopy 是一个特殊的私有结构体,tools/go/analysis/passes/copylock 包中的分析器会在编译期间检查被拷贝的变量中是否包含 sync.noCopy 或者实现了 LockUnlock 方法,如果包含该结构体或者实现了对应的方法就会报出以下错误:copies lock value: sync.WaitGroup

接口

sync.WaitGroup 对外暴露了三个方法:sync.WaitGroup.Addsync.WaitGroup.Waitsync.WaitGroup.Done

因为其中的 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法传入了 -1,所以我们重点分析另外两个方法,即 sync.WaitGroup.Addsync.WaitGroup.Wait

Add() 方法做两件事:

  1. 增加或扣减计数器(Add(正数) 时增加,Done() 时扣减)

  2. 扣减计数器后检查计数器,在计数器扣减为 0 时唤醒所有睡眠的协程

func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state() 
    state := atomic.AddUint64(statep, uint64(delta)<<32) // 将传入的 delta 参数加到高32位,即第一个计数器中。由于结构体里没有锁,因此使用 atomic
    v := int32(state >> 32) // 取高32位,第一个计数器
    w := uint32(state)      // 转为uint32取低32位,第二个计数器,表示阻塞的协程数
    ...
    // 计数器1 >0,或者没有协程调用 Wait() 方法时,可直接返回
    if v > 0 || w == 0 {
        return
    }
  
    // 计数器1 ==0 且计数器2 >0,唤醒睡眠的各个协程
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

Wait() 方法只做一件事:增加计数器并让协程进入休眠

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32) // 取高32位,第一个计数器
        // 计数器1 == 0时,可直接返回
        if v == 0 {
            return
        }
        // 否则给计数器2自增并睡眠
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
        // 陷入睡眠
            runtime_Semacquire(semap)
            return
        }
    }
}

小结

Last updated