2-9.sync.ErrGroup

golang/sync/errgroup.Group 为我们在一组 Goroutine 中提供了同步、错误传播以及上下文取消的功能,我们可以使用如下所示的方式并行获取网页的数据:

var g errgroup.Group
var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
}

for i := range urls {
    url := urls[i]
    g.Go(func() error {
        resp, err := http.Get(url)
        if err == nil {
            resp.Body.Close()
        }
        return err
    })
}

if err := g.Wait(); err == nil {
    fmt.Println("Successfully fetched all URLs.")
}

golang/sync/errgroup.Group.Go 方法能够创建一个 Goroutine 并在其中执行传入的函数,而 golang/sync/errgroup.Group.Wait 会等待所有 Goroutine 全部返回,该方法的不同返回结果也有不同的含义:

  • 如果返回错误 — 这一组 Goroutine 最少返回一个错误;

  • 如果返回空值 — 所有 Goroutine 都成功执行

结构体

golang/sync/errgroup.Group 结构体同时由三个比较重要的部分组成:

  1. cancel : 创建 context.Context 时返回的取消函数,用于在多个 Goroutine 之间同步取消信号

  2. wg : 用于等待一组 Goroutine 完成子任务的同步原语

  3. errOnce : 用于保证只接收一个子任务返回的错误

type Group struct {
    cancel func()

    wg sync.WaitGroup

    errOnce sync.Once
    err     error
}

接口

WithContext(ctx): 新建一个带上下文的 Group 变量

func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    return &Group{cancel: cancel}, ctx
}

Go(f): 新建一个协程并执行函数

func (g *Group) Go(f func() error) {
    // 1. 调用 sync.WaitGroup.Add 增加待处理的任务
    g.wg.Add(1)

	// 2. 创建新的 Goroutine 并运行子任务
    go func() {
        defer g.wg.Done()

		// 3. 发生错误时,对 err 字段赋值并调用 cancel 函数
		// 只有最早返回的错误才会被上游感知到,后续的错误都会被舍弃
        if err := f(); err != nil {
            g.errOnce.Do(func() {
                g.err = err
                if g.cancel != nil {
                    g.cancel()
                }
            })
        }
    }()
}

Wait(): 等待所有协程执行完或被取消

func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
        g.cancel()
    }
    return g.err
}

原文链接

https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#errgroup

Last updated