diva-notes
  • README
  • Ads
    • 定价策略
    • 广告层级
    • 归因模型
    • 买量
    • Chat GPT
    • Google
  • AI
    • 参考资料
    • Chat GPT
    • stable-diffusion-webui安装
  • Algorithm
    • 倍增
    • 并查集
    • 参考
    • 环的判断
    • 凸包
    • 蓄水池抽样
    • 最短路径
    • 最小生成树
    • KMP算法
    • Rabin-Karp算法
    • Tarjan桥算法
  • Architecture
    • Serverless
  • Career
  • CICD
    • 代码质量
    • CICD实践
  • Data Structure
    • 布谷鸟过滤器
    • 布隆过滤器
    • 浮点
    • 红黑树
    • 锁
    • LSM树
  • DB
    • My SQL
      • 隔离级别
      • 架构
      • 索引
      • 锁
      • 页结构
      • 主从同步
      • ACID
      • Log
      • MVCC
      • Questions
    • Postgres
      • 持久化
      • 对比MySQL
      • 隔离级别
      • 索引
      • Greenpulm
      • MVCC
    • 倒排索引
    • 列式存储
    • H Base
    • HDFS
    • MPP数据库选型
    • Questions
  • Distributed System
    • 分布式事务
    • 服务网格
    • BASE理论
    • CAP
    • Etcd
    • Raft协议
    • ZAB协议
  • Go
    • 1.语言基础
      • 1.CPU寄存器
      • 2-1.函数调用
      • 2-2.函数调用栈
      • 2.接口
      • 3.汇编
      • 4.调试
    • 2.编译
      • 1.编译
      • 2.词法与语法分析
      • 3.类型检查
      • 4.中间代码生成
      • 5.机器码生成
    • 3.数据结构
      • 1.数组array
      • 2.切片slice
      • 3.哈希表map
      • 4.字符串
    • 4.常用关键字
      • 1.循环
      • 2.defer
      • 3.panic和recover
      • 4.make和new
    • 5.并发编程
      • 1.上下文Context的实现
      • 2-1.runtime.sema信号量
      • 2-2.sync.Mutex的实现
      • 2-3.sync.WaitGroup
      • 2-4.sync.Once的实现
      • 2-5.sync.Map的实现
      • 2-6.sync.Cond
      • 2-7.sync.Pool的实现
      • 2-8.sync.Semaphore的实现
      • 2-9.sync.ErrGroup
      • 3.定时器Timer的实现
      • 4.Channel的实现
      • 5-1.调度-线程
      • 5-2.调度-MPG
      • 5-3.调度-程序及调度启动
      • 5-4.调度-调度策略
      • 5-5.调度-抢占
      • 6.netpoll实现
      • 7.atomic
    • 6.内存管理
      • 1-1.内存分配基础-TCmalloc
      • 1-2.内存分配
      • 2.垃圾回收
      • 3.栈内存管理
    • 参考
    • 各版本特性
    • 坑
    • Go程序性能优化
    • http.Client
    • net.http路由
    • profile采样的实现
    • Questions
    • time的设计
  • Kafka
    • 高可用
    • 架构
    • 消息队列选型
    • ISR
    • Questions
  • Network
    • ARP
    • DNS
    • DPVS
    • GET和POST
    • HTTP 2
    • HTTP 3
    • HTTPS
    • LVS的转发模式
    • NAT
    • Nginx
    • OSI七层模型
    • Protobuf
    • Questions
    • REST Ful
    • RPC
    • socket缓冲区
    • socket详解
    • TCP滑动窗口
    • TCP连接建立源码
    • TCP连接四元组
    • TCP三次握手
    • TCP数据结构
    • TCP四次挥手
    • TCP拥塞控制
    • TCP重传机制
    • UDP
  • OS
    • 磁盘IO
    • 调度
    • 进程VS线程
    • 零拷贝
    • 内存-虚拟内存
    • 内存分配
    • 用户态VS内核态
    • 中断
    • COW写时复制
    • IO多路复用
    • Questions
  • Redis
    • 安装
    • 参考
    • 高可用-持久化
    • 高可用-主从同步
    • 高可用-Cluster
    • 高可用-Sentinel
    • 缓存一致性
    • 事务
    • 数据结构-SDS
    • 数据结构-Skiplist
    • 数据结构-Ziplist
    • 数据结构
    • 数据类型-Hashtable
    • 数据类型-List
    • 数据类型-Set
    • 数据类型-Zset
    • 数据淘汰机制
    • 通信协议-RESP
    • Questions
    • Redis6.0多线程
    • Redis分布式锁
    • Redis分片
  • System Design
    • 本地缓存
    • 错误处理
    • 大文件处理
    • 点赞收藏关注
    • 短链接生成系统
    • 负载均衡
    • 高并发高可用
    • 规则引擎
    • 集卡活动
    • 秒杀系统
    • 评论系统
    • 熔断
    • 限流
    • 延迟队列
    • Docker
    • ES
    • K 8 S
    • Node.js
    • Questions
  • Work
    • Bash
    • Charles
    • Code Review
    • Ffmpeg
    • Git
    • intellij插件
    • I Term 2
    • Mac
    • mysql命令
    • Nginx
    • postgresql命令
    • Protoc
    • Ssh
    • Systemd
    • Tcp相关命令
    • Vim
Powered by GitBook
On this page
  • 实现原理
  • 总结
  1. Go
  2. 5.并发编程

2-8.sync.Semaphore的实现

Go 语言的 sync 包中提供了带权重的信号量 semaphore.Weight,能够控制并发访问的资源数量,比如协程数。先看个例子:

func main() {
    urls := []string{"1", "2", "3", "4"}

    s := semaphore.NewWeighted(3)               // 声明配额为3的信号量
    var w sync.WaitGroup
    for _, u := range urls {
        w.Add(1)
        go func(u string) {
            s.Acquire(context.Background(), 1)  // 占用1个信号量资源
            doSomething(u)
            s.Release(Weight)                   // 解除信号量占用
            w.Done()
        }(u)
    }
    w.Wait()
    
    fmt.Println("All Done")
}

这样就可以限制最多3个协程在跑了。

实现原理

这个结构体对外暴露了四个方法:

  • NewWeighted() 用于创建新的信号量

  • Acquire() 阻塞地获取指定权重的资源,如果当前没有空闲资源,会陷入休眠等待,即 P 操作

  • TryAcquire() 非阻塞地获取指定权重的资源,如果当前没有空闲资源,会直接返回 false

  • Release() 用于释放指定权重的资源,即 V 操作

其 Acquire() 方法用途类似于 Mutex 结构体的 Lock() ,都是申请资源、加锁,但是可以 Acquire 指定数量的资源;Release() 方法类似于 Unlock(),都是释放资源、解锁。

我们来看一下信号量 semaphore.Weighted 的数据结构:

type Weighted struct {
    size    int64         // 最大资源数
    cur     int64         // 当前已被使用的资源
    mu      sync.Mutex    // 互斥锁,对字段的保护
    waiters list.List     // 等待的调用者队列,先进先出
}

type waiter struct {
	n     int64           // 需要占用的配额
	ready chan<- struct{} // 用于唤醒调用者
}

Acquire 获取信号量

func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    s.mu.Lock() // 加锁保护临界区
   
    // 1. 有剩余配额,则扣减配额后直接返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 { 
        s.cur += n
        s.mu.Unlock()
        return nil
    }
  
    // 2. 请求的资源数大于能提供的最大的资源数,这个任务处理不了,报错并返回
    if n > s.size { 
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }
    
    // 3. 现存资源不够, 需要把调用者加入到等待队列中
    ready := make(chan struct{})    // 创建了一个 chan, 这样可以通过 close(chan) 的方式对其通知
    w := waiter{n: n, ready: ready} 
    elem := s.waiters.PushBack(w)   // 把 w 放到队尾
    s.mu.Unlock()
  
    // 等待其他协程释放资源后唤醒
    select {
    ...
    case <-ready: // 等待者被唤醒了
        return nil
    }
  }

TryAcquire

其实就是 Acquire 代码里的逻辑1,去除了等待的逻辑。

Release 归还信号量资源

Release 方法会将占用资源放回,并调用 notifyWaiters 方法,唤醒等待队列中的调用者

func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n        // 释放信号量
    s.notifyWaiters() // 通知等待者
    s.mu.Unlock()
}

notifyWaiters 通知等待者

notifyWaiters 方法会逐个检查队列里等待的调用者,如果现存资源够等待者请求的数量n,或者是没有等待者了,就返回。

func (s *Weighted) notifyWaiters() {
    for { // 遍历所有 waiter
        next := s.waiters.Front()
        if next == nil {
            break // 没有等待者了,直接返回
        }
  
        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            // 如果现有资源不够队列头调用者请求的资源数,比如配额还剩10,但要占用11,就退出。所有等待者会继续等待
            // 这里还是按照先入先出的方式处理是为了避免饥饿
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready) // close(chan),唤醒等待者
    }
}

notifyWaiters 方法是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使队列后面有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。

总结

在 Go 语言中信号量有时候也会被 Channel 类型所取代,因为一个 buffered chan 也可以代表 n 个资源。不过既然 Go 语言通过golang.orgx/sync 扩展库对外提供了 semaphore.Weight 这一种信号量实现,遇到使用信号量的场景时还是尽量使用官方提供的实现。在使用的过程中我们需要注意以下的几个问题:

  • Acquire 和 TryAcquire 方法都可以用于获取资源,前者会阻塞地获取信号量。后者会非阻塞地获取信号量,如果获取不到就返回 false。

  • Release 归还信号量后,会以先进先出的顺序唤醒队列中的等待者。如果现有资源不够队头的调用者请求的资源数,所有等待者会继续等待。

  • 如果一个 goroutine 申请较多的资源,由于上面说的归还后唤醒等待者的策略,它可能会等待比较长的时间。

参考

Previous2-7.sync.Pool的实现Next2-9.sync.ErrGroup

Last updated 2 years ago

KevinYan11 - 信号量的使用方法和其实现原理