Go 语言的 sync
包中提供了带权重的信号量 semaphore.Weight
,能够控制并发访问的资源数量,比如协程数。先看个例子:
Copy func main () {
urls := [] string { "1" , "2" , "3" , "4" }
s := semaphore. NewWeighted ( 3 ) // 声明配额为3的信号量
var w sync . WaitGroup
for _, u := range urls {
w. Add ( 1 )
go func (u string ) {
s. Acquire (context. Background (), 1 ) // 占用1个信号量资源
doSomething (u)
s. Release (Weight) // 解除信号量占用
w. Done ()
}(u)
}
w. Wait ()
fmt. Println ( "All Done" )
}
这样就可以限制最多3个协程在跑了。
实现原理
这个结构体对外暴露了四个方法:
Acquire()
阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待,即 P
操作
TryAcquire()
非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回 false
Release()
用于释放指定权重的资源,即 V
操作
其 Acquire()
方法用途类似于 Mutex
结构体的 Lock()
,都是申请资源、加锁,但是可以 Acquire
指定数量的资源;Release()
方法类似于 Unlock()
,都是释放资源、解锁。
我们来看一下信号量 semaphore.Weighted
的数据结构:
Copy type Weighted struct {
size int64 // 最大资源数
cur int64 // 当前已被使用的资源
mu sync . Mutex // 互斥锁,对字段的保护
waiters list . List // 等待的调用者队列,先进先出
}
type waiter struct {
n int64 // 需要占用的配额
ready chan<- struct {} // 用于唤醒调用者
}
Acquire 获取信号量
Copy func (s * Weighted ) Acquire (ctx context . Context , n int64 ) error {
s.mu. Lock () // 加锁保护临界区
// 1. 有剩余配额,则扣减配额后直接返回
if s.size - s.cur >= n && s.waiters. Len () == 0 {
s.cur += n
s.mu. Unlock ()
return nil
}
// 2. 请求的资源数大于能提供的最大的资源数,这个任务处理不了,报错并返回
if n > s.size {
s.mu. Unlock ()
<- ctx. Done ()
return ctx. Err ()
}
// 3. 现存资源不够, 需要把调用者加入到等待队列中
ready := make ( chan struct {}) // 创建了一个 chan, 这样可以通过 close(chan) 的方式对其通知
w := waiter {n: n, ready: ready}
elem := s.waiters. PushBack (w) // 把 w 放到队尾
s.mu. Unlock ()
// 等待其他协程释放资源后唤醒
select {
...
case <- ready: // 等待者被唤醒了
return nil
}
}
TryAcquire
其实就是 Acquire
代码里的逻辑1,去除了等待的逻辑。
Release 归还信号量资源
Release
方法会将占用资源放回,并调用 notifyWaiters
方法,唤醒等待队列中的调用者
Copy func (s * Weighted ) Release (n int64 ) {
s.mu. Lock ()
s.cur -= n // 释放信号量
s. notifyWaiters () // 通知等待者
s.mu. Unlock ()
}
notifyWaiters 通知等待者
notifyWaiters
方法会逐个检查队列里等待的调用者,如果现存资源够等待者请求的数量n,或者是没有等待者了,就返回。
Copy func (s * Weighted ) notifyWaiters () {
for { // 遍历所有 waiter
next := s.waiters. Front ()
if next == nil {
break // 没有等待者了,直接返回
}
w := next.Value.( waiter )
if s.size - s.cur < w.n {
// 如果现有资源不够队列头调用者请求的资源数,比如配额还剩10,但要占用11,就退出。所有等待者会继续等待
// 这里还是按照先入先出的方式处理是为了避免饥饿
break
}
s.cur += w.n
s.waiters. Remove (next)
close (w.ready) // close(chan),唤醒等待者
}
}
notifyWaiters
方法是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使队列后面有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。
总结
在 Go
语言中信号量有时候也会被 Channel
类型所取代,因为一个 buffered chan
也可以代表 n 个资源。不过既然 Go
语言通过golang.orgx/sync
扩展库对外提供了 semaphore.Weight
这一种信号量实现,遇到使用信号量的场景时还是尽量使用官方提供的实现。在使用的过程中我们需要注意以下的几个问题:
Acquire
和 TryAcquire
方法都可以用于获取资源,前者会阻塞地获取信号量。后者会非阻塞地获取信号量,如果获取不到就返回 false
。
Release
归还信号量后,会以先进先出的顺序唤醒队列中的等待者。如果现有资源不够队头的调用者请求的资源数,所有等待者会继续等待。
如果一个 goroutine
申请较多的资源,由于上面说的归还后唤醒等待者的策略,它可能会等待比较长的时间。
参考
KevinYan11 - 信号量的使用方法和其实现原理