2-6.sync.Cond

sync.Cond 是 Go 语言对条件锁的实现。它可以让一组 Goroutine 在不满足条件时阻塞等待,满足条件时被唤醒。

结构体

内嵌了 Locker 接口(一般用 Mutex 结构作为 Locker 就可以),和一个链表作为等待者队列。

type Cond struct {
    noCopy  noCopy       // 用于保证结构体不会在编译期间拷贝
    checker copyChecker  // 用于禁止运行期间发生的拷贝
    L       Locker       // 接口,需要实现 Lock 和 Unlock 方法,一般使用 Mutex 作为 Locker 即可
    notify  notifyList   // 等待的协程链表
}

// 这个 wait 和 notify 字段是只增不减的,相当于自增ID。不是很理解为什么有链表了还需要索引
type notifyList struct {
    wait uint32     // 正在等待的协程索引
    notify uint32   // 已经通知到的协程索引

    lock mutex
    head *sudog
    tail *sudog
}

使用方法

var status int64

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m) // 传入一个锁作为 Cond 的参数
  
    // 起10个监听协程
    for i := 0; i < 10; i++ {
        go listen(c)
    }
    
    // 1秒后起一个协程广播
    // time.Sleep(1 * time.Second)
    go broadcast(c)

    // 阻塞主协程,方便观察输出
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt) // 监听系统信号
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    atomic.StoreInt64(&status, 1)
    c.Broadcast() // Broadcast() 方法会唤醒所有等待的协程
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    for atomic.LoadInt64(&status) != 1 {
        c.Wait() // Wait() 方法会释放锁,并将当前协程放入等待者列表中
    }
    fmt.Println("listen")
    c.L.Unlock()
}

这个程序会在1秒后输出10个 "listen"

接口

对外提供三个接口:Wait(), Signal() 与 Broadcast()

Wait()

Wait() 方法会释放当前协程拿到的锁,并将当前协程放到等待者队列里

func (c *Cond) Wait() {
    c.checker.check()                     // 检查是否发生运行时拷贝
    t := runtime_notifyListAdd(&c.notify) // 等待者计数器+1,返回索引
    c.L.Unlock()                          // 释放锁
    runtime_notifyListWait(&c.notify, t)  // 将当前协程放入等待者队列,并给它记上索引
    c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1    // 等待者计数器+1,返回索引
}

func notifyListWait(l *notifyList, t uint32) {
    s := acquireSudog() 
    s.g = getg()           // 获取当前协程
    s.ticket = t           // 赋值索引
    if l.tail == nil {
        l.head = s         // 等待者队列为空,则把当前协程设为链表表头
    } else {
        l.tail.next = s    // 否则放入队尾
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) // 休眠当前协程,让出当前处理器的使用权并等待调度器的唤醒
    releaseSudog(s)
}

Signal() 与 Broadcast()

Signal()Broadcast() 就是用来唤醒陷入休眠的 Goroutine 的方法

  • Signal() 方法会唤醒队列最前面的协程

  • Broadcast() 方法会唤醒队列中全部的协程

// 唤醒队列最前面的协程
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// 唤醒队列全部协程
func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

// 遍历等待者队列,取第一个协程出来唤醒
func notifyListNotifyOne(l *notifyList) {
    t := l.notify
    atomic.Store(&l.notify, t+1) // 更新当前遍历到的索引下标

    // 遍历链表,找到 notify 字段表示的索引位置上的协程。不是很理解为什么不直接取表头元素
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n // 将协程从链表中剔除
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            s.next = nil
            readyWithTime(s, 4) // 唤醒
            return
        }
    }
}

// 遍历等待者队列,全部唤醒
func notifyListNotifyAll(l *notifyList) {
    s := l.head
    l.head = nil
    l.tail = nil

    atomic.Store(&l.notify, atomic.Load(&l.wait)) // 更新 notify 索引进度

    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

小结

sync.Cond 不是一个常用的同步机制,但是在条件长时间无法满足时,与使用 for {} 进行忙碌等待相比,sync.Cond 能够让出处理器的使用权,提高 CPU 的利用率。使用时我们也需要注意以下问题:

  • sync.Cond.Wait 在调用之前一定要传入互斥锁作为参数,否则会触发程序崩溃;

  • sync.Cond.Signal 唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;

  • sync.Cond.Broadcast 会按照一定顺序广播通知等待的全部 Goroutine;

Last updated