diva-notes
  • README
  • Ads
    • 定价策略
    • 广告层级
    • 归因模型
    • 买量
    • Chat GPT
    • Google
  • AI
    • 参考资料
    • Chat GPT
    • stable-diffusion-webui安装
  • Algorithm
    • 倍增
    • 并查集
    • 参考
    • 环的判断
    • 凸包
    • 蓄水池抽样
    • 最短路径
    • 最小生成树
    • KMP算法
    • Rabin-Karp算法
    • Tarjan桥算法
  • Architecture
    • Serverless
  • Career
  • CICD
    • 代码质量
    • CICD实践
  • Data Structure
    • 布谷鸟过滤器
    • 布隆过滤器
    • 浮点
    • 红黑树
    • 锁
    • LSM树
  • DB
    • My SQL
      • 隔离级别
      • 架构
      • 索引
      • 锁
      • 页结构
      • 主从同步
      • ACID
      • Log
      • MVCC
      • Questions
    • Postgres
      • 持久化
      • 对比MySQL
      • 隔离级别
      • 索引
      • Greenpulm
      • MVCC
    • 倒排索引
    • 列式存储
    • H Base
    • HDFS
    • MPP数据库选型
    • Questions
  • Distributed System
    • 分布式事务
    • 服务网格
    • BASE理论
    • CAP
    • Etcd
    • Raft协议
    • ZAB协议
  • Go
    • 1.语言基础
      • 1.CPU寄存器
      • 2-1.函数调用
      • 2-2.函数调用栈
      • 2.接口
      • 3.汇编
      • 4.调试
    • 2.编译
      • 1.编译
      • 2.词法与语法分析
      • 3.类型检查
      • 4.中间代码生成
      • 5.机器码生成
    • 3.数据结构
      • 1.数组array
      • 2.切片slice
      • 3.哈希表map
      • 4.字符串
    • 4.常用关键字
      • 1.循环
      • 2.defer
      • 3.panic和recover
      • 4.make和new
    • 5.并发编程
      • 1.上下文Context的实现
      • 2-1.runtime.sema信号量
      • 2-2.sync.Mutex的实现
      • 2-3.sync.WaitGroup
      • 2-4.sync.Once的实现
      • 2-5.sync.Map的实现
      • 2-6.sync.Cond
      • 2-7.sync.Pool的实现
      • 2-8.sync.Semaphore的实现
      • 2-9.sync.ErrGroup
      • 3.定时器Timer的实现
      • 4.Channel的实现
      • 5-1.调度-线程
      • 5-2.调度-MPG
      • 5-3.调度-程序及调度启动
      • 5-4.调度-调度策略
      • 5-5.调度-抢占
      • 6.netpoll实现
      • 7.atomic
    • 6.内存管理
      • 1-1.内存分配基础-TCmalloc
      • 1-2.内存分配
      • 2.垃圾回收
      • 3.栈内存管理
    • 参考
    • 各版本特性
    • 坑
    • Go程序性能优化
    • http.Client
    • net.http路由
    • profile采样的实现
    • Questions
    • time的设计
  • Kafka
    • 高可用
    • 架构
    • 消息队列选型
    • ISR
    • Questions
  • Network
    • ARP
    • DNS
    • DPVS
    • GET和POST
    • HTTP 2
    • HTTP 3
    • HTTPS
    • LVS的转发模式
    • NAT
    • Nginx
    • OSI七层模型
    • Protobuf
    • Questions
    • REST Ful
    • RPC
    • socket缓冲区
    • socket详解
    • TCP滑动窗口
    • TCP连接建立源码
    • TCP连接四元组
    • TCP三次握手
    • TCP数据结构
    • TCP四次挥手
    • TCP拥塞控制
    • TCP重传机制
    • UDP
  • OS
    • 磁盘IO
    • 调度
    • 进程VS线程
    • 零拷贝
    • 内存-虚拟内存
    • 内存分配
    • 用户态VS内核态
    • 中断
    • COW写时复制
    • IO多路复用
    • Questions
  • Redis
    • 安装
    • 参考
    • 高可用-持久化
    • 高可用-主从同步
    • 高可用-Cluster
    • 高可用-Sentinel
    • 缓存一致性
    • 事务
    • 数据结构-SDS
    • 数据结构-Skiplist
    • 数据结构-Ziplist
    • 数据结构
    • 数据类型-Hashtable
    • 数据类型-List
    • 数据类型-Set
    • 数据类型-Zset
    • 数据淘汰机制
    • 通信协议-RESP
    • Questions
    • Redis6.0多线程
    • Redis分布式锁
    • Redis分片
  • System Design
    • 本地缓存
    • 错误处理
    • 大文件处理
    • 点赞收藏关注
    • 短链接生成系统
    • 负载均衡
    • 高并发高可用
    • 规则引擎
    • 集卡活动
    • 秒杀系统
    • 评论系统
    • 熔断
    • 限流
    • 延迟队列
    • Docker
    • ES
    • K 8 S
    • Node.js
    • Questions
  • Work
    • Bash
    • Charles
    • Code Review
    • Ffmpeg
    • Git
    • intellij插件
    • I Term 2
    • Mac
    • mysql命令
    • Nginx
    • postgresql命令
    • Protoc
    • Ssh
    • Systemd
    • Tcp相关命令
    • Vim
Powered by GitBook
On this page
  • 1. 常用场景
  • 1.1 使用例子
  • 2. 底层结构
  • 3. 具体实现
  • cancelCtx
  • valueCtx
  • timerCtx
  • 4. net/http中的实际应用
  • 参考
  1. Go
  2. 5.并发编程

1.上下文Context的实现

1. 常用场景

  1. 并发安全的传递上下文,如传递 trace_id、kv 结构等

  2. 树状并发情景中的协程控制(某层任务取消后,及时停止所有的下层任务,上层不受影响;减少额外资源的消耗,减少处理时长)

  3. 超时控制

1.1 使用例子

func main() {
    ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) // 1s的超时
	defer cancel1()  // 保证返回前调用一次 cacel。这样比如主协程 panic 时,就可以调用到 cancel 函数,释放子协程资源

	req1, _ := http.NewRequestWithContext(ctx1, http.MethodGet, "", nil)
	go A(req1)
}

func A(r http.Request) {
    ... // 一些耗时的处理逻辑
    B(r)
}

func B(r http.Request) {
    select {
    // B这里应当先检查是否超时。如果超时,则提现返回,减少链路整理时长
    case <-ctx.Done():  
        return
    default:
    }
}

2. 底层结构

// 其实是个接口
type Context interface {
    Deadline() (deadline time.Time, ok bool) // 返回 context 被取消的时间,也就是完成工作的截止日期
    Done() <-chan struct{}   // 被取消或者超时时候返回的一个close的channel,子函数需要对其监听。(像全局广播,因为是close的,所有协程都能收到)
    Err() error              // 返回被取消或超时的原因
    Value(key interface{}) interface{} // 用于保存kv;树状结构:如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点
}


// A canceler is a context type that can be canceled directly. The implementations are *cancelCtx and *timerCtx.
type canceler interface {
      cancel(removeFromParent bool, err error)
      Done() <-chan struct{}
}

为啥不将这里的 canceler 接口与 Context 接口合并,而是分成两个方法呢?他们定义的方法中都有 Done 方法。可以解释得通的说法是,源码作者认为 cancel 方法并不是 Context 必须的,根据最小接口设计原则,将两者分开。像 emptyCtx 和 valueCtx 不是可取消的,所以他们只要实现 Context 接口即可。

库里头提供了4个 Context 实现,来供大家玩耍

// 完全空的Context,用作树的根节点,其实是个int
type emptyCtx int

// 继承自Context,并实现了canceler接口
type cancelCtx struct {
    Context
    mu    sync.Mutex
    done chan struct{}
    
    children map[canceler]struct{} // 记录可取消的孩子节点
    err error
}

// 继承自cancelCtx,加了个timer,以支持超时处理
type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.
    deadline time.Time
}

// 继承自Context,加了个kv
// 携带kv信息,为全链路提供线索,比如接入elk等系统,需要来一个trace_id,valueCtx就非常适合做这个事
type valueCtx struct {
    Context
    key, val interface{}
}

3. 具体实现

cancelCtx

cancel方法的实现

其实主要就做了一件事:close(c.done) 把 c.done 这个 channel 关闭掉,这样所有通过 select 方法监听 <- c.Done() 的协程都会收到通知

// closedchan is a reusable closed channel.
var closedchan = make(chan struct{})

func init() {
    close(closedchan)
}


func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    c.mu.Lock() // 加锁
    c.err = err // 设置取消原因
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done) // 主要逻辑:关闭 c.done 这个 channel
    }
    for child := range c.children { // 将子节点依次取消
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()
}

WithCancel

// 新建一个cancel context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c) // 将新建的 c 加到其父节点的 child 列表里。当父上下文被取消时,子上下文也会被取消:
    return &c, func() { c.cancel(true, Canceled) }
}

func propagateCancel(parent Context, child canceler) {
    // parentCancelCtx这个函数会从parent开始向上找,直到找到一个可取消的context为止
    // 即找到parent(含)最近的可取消祖先节点
    if p, ok := parentCancelCtx(parent); ok { 
        p.mu.Lock()
        if p.children == nil {
            p.children = make(map[canceler]struct{})
        }
        p.children[child] = struct{}{} // 将child放到这个祖先节点的children列表里,以便祖先取消时能一次调用child的cancel方法
        p.mu.Unlock()
    } else {
        // 如果没找到可用的祖先节点,就新起一个协程,监听父节点的Done事件
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

// 用作查祖先节点时的key,其值为0没用,用的时候都是取址(见下面函数,把 &cancelCtxKey 这个地址当成 key 来用了
var cancelCtxKey int

// 找到parent(含)最近的可取消祖先节点
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
    if !ok {
        return nil, false
    }
    return p, true
}

valueCtx

// 根据key查找value,如果找不到则递归从父节点向上找
func (c *cancelCtx) Value(key interface{}) interface{} {
    if key == &cancelCtxKey {
        return c
    }
    return c.Context.Value(key) // 递归向上找
}

timerCtx

WithTimeout & WithDeadline

WithDeadline 内置了 timer 定时器,到期就执行 cancel

Withtimeout 实际上是 Withdeadline 的语法糖,使用 timeout 参数算了下 deadline 而已

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
        // 这里设了 timer 定时器,到期执行 c.cancel 方法
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

4. net/http中的实际应用

  1. 首先 Server 在开启服务时会创建一个 valueCtx, 存储了server的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx。

func (srv *Server) Serve(l net.Listener) error {    
    baseCtx := context.Background() 
  
    ctx := context.WithValue(baseCtx, ServerContextKey, srv)    // ServerContextKey = "http-server"
    for {        
        rw, e := l.Accept()        
        c := srv.newConn(rw)
        go c.serve(ctx)  // 开启新协程处理连接,并传递ctx
    }
}
  1. 建立连接之后会基于传入的context创建一个valueCtx用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。

// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
    ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) // 本地地址
  
    ctx, cancelCtx := context.WithCancel(ctx)
    c.cancelCtx = cancelCtx
    defer cancelCtx()
  
    for {
        w, err := c.readRequest(ctx)
        serverHandler{c.server}.ServeHTTP(w, w.req)        
    }
}
  1. 读取到请求之后,会再次基于传入的context创建新的cancelCtx,并设置到当前请求对象req上,同时生成的response对象中cancelCtx保存了当前context取消的方法。

func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
    req, err := readRequest(c.bufr, keepHostHeader)
    ctx, cancelCtx := context.WithCancel(ctx)
    req.ctx = ctx

    w = &response{
        conn:          c,
        cancelCtx:     cancelCtx,
        req:           req,
        reqBody:       req.Body
    }
    return w, nil
}

在整个server处理流程中,使用了一条 context 链贯穿Server、Connection、Request,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。

另外有两点值得注意:

  1. context 的取消是柔性的,上游任务仅仅使用 context 通知下游不再需要,不会直接干涉和中断下游任务的执行,由下游任务自行处理取消逻辑,下游自己写 select <- ctx.Done() 的逻辑。

  2. context 是线程安全的,其本身是不可变的(immutable),且并发情况下会加锁。

参考

Previous5.并发编程Next2-1.runtime.sema信号量

Last updated 2 years ago

是 Go 语言在 1.7 版本中引入标准库的接口,该接口定义了四个需要实现的方法,其中包括:

context.Context
hudingyu - 深入理解Golang之context
咖啡色的羊驼 - 由浅入深聊聊Golang的context