2-1.runtime.sema信号量

sync 包下的结构体或多或少都使用了信号量机制,因此先介绍信号量

信号量

信号量是并发编程中常见的一种同步机制,在需要控制访问资源的线程数量时就会用到信号量。其实信号量就是一种变量或者抽象数据类型,用于控制并发系统中多个进程对公共资源的访问,访问具有原子性。信号量主要分为两类:

  • 二进制信号量 binary semaphore:其值只有两种 0 或者 1,相当于互斥量,当值为 1 时资源可用,当值为 0 时,资源被锁住,进程阻塞无法继续执行。

    • Linux 系统中,二进制信号量又称互斥锁 Mutex

  • 计数信号量 / 一般信号量 counting semaphore / general semaphore:信号量是一个任意的整数,起始时,如果计数器的计数值为 0,那么创建出来的信号量就是不可获得的状态,如果计数器的计数值大于 0,那么创建出来的信号量就是可获得的状态,并且总共获取的次数等于计数器的值。

信号量是由操作系统来维护的,信号量只能进行两种操作:等待和发送信号,即 PV 操作:

  • P 原语:P 是荷兰语 Proberen (测试) 的首字母。为阻塞原语,负责把当前进程由运行状态转换为阻塞状态,直到另外一个进程唤醒它。操作为:申请一个空闲资源 (把信号量减1),若成功,则退出;若失败,则该进程被阻塞;

  • V 原语:V 是荷兰语 Verhogen (增加) 的首字母。为唤醒原语,负责把一个被阻塞的进程唤醒,它有一个参数表,存放着等待被唤醒的进程信息。操作为:释放一个被占用的资源 (把信号量加1),如果发现有被阻塞的进程,则选择一个唤醒之。

在信号量进行 PV 操作时都为原子操作,并且在 PV 原语执行期间不允许有中断的发生。

PV 原语对信号量的操作可以分为三种情况:

  • 把信号量视为某种类型的共享资源的剩余个数,实现对一类共享资源的访问

  • 视信号量为一个加锁标志,实现对一个共享变量的访问

  • 把信号量用作进程间的同步

运行方式:

  1. 初始化信号量,给与它一个非负数的整数值。

  2. 运行 P,即 wait,信号量 S 的值将被减少。企图进入临界区的进程(必须申请锁的进程),需要先运行 wait()。当信号量 S 减为负值时,进程会被阻塞住,不能继续;当信号量 S 不为负值时,进程可以获准进入临界区。

  3. 运行 V,即 signal,信号量 S 的值会被增加。结束离开临界区的进程(比如释放锁的进程),将会运行 signal(),释放信号量。当信号量 S 不为负值时,先前被阻塞住的其他进程,将可获准进入临界区。

我们一般用信号量保护一组资源,比如数据库连接池、一组客户端的连接等等。每次获取资源时都会将信号量中的计数器减去对应的数值,在释放资源时重新加回来。当遇到信号量资源不够时尝试获取的线程就会进入休眠,等待其他线程释放归还信号量。如果信号量是只有0和1的二进位信号量,那么,它的 P/V 就和互斥锁的 Lock/Unlock 一样了。

Go 中的 sema

semaGo 实现的一个类似于 Futex 的信号量库,用于阻塞和唤醒协程。主要包含三个字段:一个整数用作锁,一个树堆作为队列存储阻塞的协程,和一个整数存储阻塞的协程数量

// runtime/sema.go
type semaRoot struct {
    lock  mutex  // 一个int
    treap *sudog // 一棵树堆的根节点,存储阻塞在这个锁上的协程
    nwait uint32 // 二叉树上协程的数量
}

// runtime/runtime2.go
type sudog struct {
    g *g
    parent   *sudog
}

树堆:普通的二叉搜索树可能存在平衡问题(一边深一边浅,甚至可能退化为一个链表),因此给数上的每个节点随机赋一个权重,根据这个权重旋转二叉树,把它调整为一个最大或最小堆

为啥用树堆而不是链表或数组做队列存储等待的协程呢?

因为要插入的协程可能已经存在于队列里了,为了快速查找插入位置,树里按照地址排序

PV 相关的函数为:

  1. semacquire1() 加锁时调用,对应P操作。申请资源,阻塞休眠当前协程

  2. semrelease1() 解锁时调用,对应V操作。释放资源,唤醒等待的协程

核心逻辑

这里简单看下 semacquire1() 函数的逻辑:

  1. 先 CAS 修改信号量,修改成功则视为成功获取资源

  2. 否则将当前协程放入树里,并使用 gopark 休眠当前协程

// sync.Mutex 锁结构里调用了 semacquire1 函数,入参为锁的 sema 字段的地址
func (m *Mutex) lockSlow() {
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
}


// 申请加锁
// P(S)操作。申请资源、减信号量。如果申请不到资源,则阻塞当前G,等待释放
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    // 里面使用 CAS 先尝试修改 addr 地址上的变量
    if cansemacquire(addr) {
        return
    }

    s := acquireSudog()

    for {
        // 增加等待者数量,用于释放资源时快速判断
        atomic.Xadd(&root.nwait, 1)
        // 将协程放入树里
        root.queue(addr, s, lifo)
        // 休眠当前协程,让出 M
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
        
        // 恢复运行 判断是否可以拿到信号量 
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
}

Last updated