2-8.sync.Semaphore的实现

Go 语言的 sync 包中提供了带权重的信号量 semaphore.Weight,能够控制并发访问的资源数量,比如协程数。先看个例子:

func main() {
    urls := []string{"1", "2", "3", "4"}

    s := semaphore.NewWeighted(3)               // 声明配额为3的信号量
    var w sync.WaitGroup
    for _, u := range urls {
        w.Add(1)
        go func(u string) {
            s.Acquire(context.Background(), 1)  // 占用1个信号量资源
            doSomething(u)
            s.Release(Weight)                   // 解除信号量占用
            w.Done()
        }(u)
    }
    w.Wait()
    
    fmt.Println("All Done")
}

这样就可以限制最多3个协程在跑了。

实现原理

这个结构体对外暴露了四个方法:

  • NewWeighted() 用于创建新的信号量

  • Acquire() 阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待,即 P 操作

  • TryAcquire() 非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回 false

  • Release() 用于释放指定权重的资源,即 V 操作

Acquire() 方法用途类似于 Mutex 结构体的 Lock() ,都是申请资源、加锁,但是可以 Acquire 指定数量的资源;Release() 方法类似于 Unlock(),都是释放资源、解锁。

我们来看一下信号量 semaphore.Weighted 的数据结构:

type Weighted struct {
    size    int64         // 最大资源数
    cur     int64         // 当前已被使用的资源
    mu      sync.Mutex    // 互斥锁,对字段的保护
    waiters list.List     // 等待的调用者队列,先进先出
}

type waiter struct {
	n     int64           // 需要占用的配额
	ready chan<- struct{} // 用于唤醒调用者
}

Acquire 获取信号量

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock() // 加锁保护临界区
   
    // 1. 有剩余配额,则扣减配额后直接返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 { 
        s.cur += n
        s.mu.Unlock()
        return nil
    }
  
    // 2. 请求的资源数大于能提供的最大的资源数,这个任务处理不了,报错并返回
    if n > s.size { 
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }
    
    // 3. 现存资源不够, 需要把调用者加入到等待队列中
    ready := make(chan struct{})    // 创建了一个 chan, 这样可以通过 close(chan) 的方式对其通知
    w := waiter{n: n, ready: ready} 
    elem := s.waiters.PushBack(w)   // 把 w 放到队尾
    s.mu.Unlock()
  
    // 等待其他协程释放资源后唤醒
    select {
    ...
    case <-ready: // 等待者被唤醒了
        return nil
    }
  }

TryAcquire

其实就是 Acquire 代码里的逻辑1,去除了等待的逻辑。

Release 归还信号量资源

Release 方法会将占用资源放回,并调用 notifyWaiters 方法,唤醒等待队列中的调用者

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n        // 释放信号量
    s.notifyWaiters() // 通知等待者
    s.mu.Unlock()
}

notifyWaiters 通知等待者

notifyWaiters 方法会逐个检查队列里等待的调用者,如果现存资源够等待者请求的数量n,或者是没有等待者了,就返回。

func (s *Weighted) notifyWaiters() {
    for { // 遍历所有 waiter
        next := s.waiters.Front()
        if next == nil {
            break // 没有等待者了,直接返回
        }
  
        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            // 如果现有资源不够队列头调用者请求的资源数,比如配额还剩10,但要占用11,就退出。所有等待者会继续等待
            // 这里还是按照先入先出的方式处理是为了避免饥饿
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready) // close(chan),唤醒等待者
    }
}

notifyWaiters 方法是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使队列后面有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。

总结

Go 语言中信号量有时候也会被 Channel 类型所取代,因为一个 buffered chan 也可以代表 n 个资源。不过既然 Go 语言通过golang.orgx/sync 扩展库对外提供了 semaphore.Weight 这一种信号量实现,遇到使用信号量的场景时还是尽量使用官方提供的实现。在使用的过程中我们需要注意以下的几个问题:

  • AcquireTryAcquire 方法都可以用于获取资源,前者会阻塞地获取信号量。后者会非阻塞地获取信号量,如果获取不到就返回 false

  • Release 归还信号量后,会以先进先出的顺序唤醒队列中的等待者。如果现有资源不够队头的调用者请求的资源数,所有等待者会继续等待。

  • 如果一个 goroutine 申请较多的资源,由于上面说的归还后唤醒等待者的策略,它可能会等待比较长的时间。

参考

KevinYan11 - 信号量的使用方法和其实现原理

Last updated