1.上下文Context的实现

1. 常用场景

  1. 并发安全的传递上下文,如传递 trace_id、kv 结构等

  2. 树状并发情景中的协程控制(某层任务取消后,及时停止所有的下层任务,上层不受影响;减少额外资源的消耗,减少处理时长)

  3. 超时控制

1.1 使用例子

func main() {
    ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) // 1s的超时
	defer cancel1()  // 保证返回前调用一次 cacel。这样比如主协程 panic 时,就可以调用到 cancel 函数,释放子协程资源

	req1, _ := http.NewRequestWithContext(ctx1, http.MethodGet, "", nil)
	go A(req1)
}

func A(r http.Request) {
    ... // 一些耗时的处理逻辑
    B(r)
}

func B(r http.Request) {
    select {
    // B这里应当先检查是否超时。如果超时,则提现返回,减少链路整理时长
    case <-ctx.Done():  
        return
    default:
    }
}

2. 底层结构

context.Context 是 Go 语言在 1.7 版本中引入标准库的接口,该接口定义了四个需要实现的方法,其中包括:

// 其实是个接口
type Context interface {
    Deadline() (deadline time.Time, ok bool) // 返回 context 被取消的时间,也就是完成工作的截止日期
    Done() <-chan struct{}   // 被取消或者超时时候返回的一个close的channel,子函数需要对其监听。(像全局广播,因为是close的,所有协程都能收到)
    Err() error              // 返回被取消或超时的原因
    Value(key interface{}) interface{} // 用于保存kv;树状结构:如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点
}


// A canceler is a context type that can be canceled directly. The implementations are *cancelCtx and *timerCtx.
type canceler interface {
      cancel(removeFromParent bool, err error)
      Done() <-chan struct{}
}

为啥不将这里的 canceler 接口与 Context 接口合并,而是分成两个方法呢?他们定义的方法中都有 Done 方法。可以解释得通的说法是,源码作者认为 cancel 方法并不是 Context 必须的,根据最小接口设计原则,将两者分开。像 emptyCtxvalueCtx 不是可取消的,所以他们只要实现 Context 接口即可。

库里头提供了4个 Context 实现,来供大家玩耍

// 完全空的Context,用作树的根节点,其实是个int
type emptyCtx int

// 继承自Context,并实现了canceler接口
type cancelCtx struct {
    Context
    mu    sync.Mutex
    done chan struct{}
    
    children map[canceler]struct{} // 记录可取消的孩子节点
    err error
}

// 继承自cancelCtx,加了个timer,以支持超时处理
type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.
    deadline time.Time
}

// 继承自Context,加了个kv
// 携带kv信息,为全链路提供线索,比如接入elk等系统,需要来一个trace_id,valueCtx就非常适合做这个事
type valueCtx struct {
    Context
    key, val interface{}
}

3. 具体实现

cancelCtx

cancel方法的实现

其实主要就做了一件事:close(c.done)c.done 这个 channel 关闭掉,这样所有通过 select 方法监听 <- c.Done() 的协程都会收到通知

// closedchan is a reusable closed channel.
var closedchan = make(chan struct{})

func init() {
    close(closedchan)
}


func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    c.mu.Lock() // 加锁
    c.err = err // 设置取消原因
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done) // 主要逻辑:关闭 c.done 这个 channel
    }
    for child := range c.children { // 将子节点依次取消
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()
}

WithCancel

// 新建一个cancel context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c) // 将新建的 c 加到其父节点的 child 列表里。当父上下文被取消时,子上下文也会被取消:
    return &c, func() { c.cancel(true, Canceled) }
}

func propagateCancel(parent Context, child canceler) {
    // parentCancelCtx这个函数会从parent开始向上找,直到找到一个可取消的context为止
    // 即找到parent(含)最近的可取消祖先节点
    if p, ok := parentCancelCtx(parent); ok { 
        p.mu.Lock()
        if p.children == nil {
            p.children = make(map[canceler]struct{})
        }
        p.children[child] = struct{}{} // 将child放到这个祖先节点的children列表里,以便祖先取消时能一次调用child的cancel方法
        p.mu.Unlock()
    } else {
        // 如果没找到可用的祖先节点,就新起一个协程,监听父节点的Done事件
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

// 用作查祖先节点时的key,其值为0没用,用的时候都是取址(见下面函数,把 &cancelCtxKey 这个地址当成 key 来用了
var cancelCtxKey int

// 找到parent(含)最近的可取消祖先节点
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
    if !ok {
        return nil, false
    }
    return p, true
}

valueCtx

// 根据key查找value,如果找不到则递归从父节点向上找
func (c *cancelCtx) Value(key interface{}) interface{} {
    if key == &cancelCtxKey {
        return c
    }
    return c.Context.Value(key) // 递归向上找
}

timerCtx

WithTimeout & WithDeadline

WithDeadline 内置了 timer 定时器,到期就执行 cancel

Withtimeout 实际上是 Withdeadline 的语法糖,使用 timeout 参数算了下 deadline 而已

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
        // 这里设了 timer 定时器,到期执行 c.cancel 方法
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

4. net/http中的实际应用

  1. 首先 Server 在开启服务时会创建一个 valueCtx, 存储了server的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx

func (srv *ServerServe(l net.Listenererror {    
    baseCtx := context.Background() 
  
    ctx := context.WithValue(baseCtx, ServerContextKey, srv)    // ServerContextKey = "http-server"
    for {        
        rw, e := l.Accept()        
:= srv.newConn(rw)
        go c.serve(ctx)  // 开启新协程处理连接,并传递ctx
    }
}
  1. 建立连接之后会基于传入的context创建一个valueCtx用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。

// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
    ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) // 本地地址
  
    ctx, cancelCtx := context.WithCancel(ctx)
    c.cancelCtx = cancelCtx
    defer cancelCtx()
  
    for {
        w, err := c.readRequest(ctx)
        serverHandler{c.server}.ServeHTTP(w, w.req)        
    }
}
  1. 读取到请求之后,会再次基于传入的context创建新的cancelCtx,并设置到当前请求对象req上,同时生成的response对象中cancelCtx保存了当前context取消的方法。

func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
    req, err := readRequest(c.bufr, keepHostHeader)
    ctx, cancelCtx := context.WithCancel(ctx)
    req.ctx = ctx

    w = &response{
        conn:          c,
        cancelCtx:     cancelCtx,
        req:           req,
        reqBody:       req.Body
    }
    return w, nil
}

在整个server处理流程中,使用了一条 context 链贯穿ServerConnectionRequest,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。

另外有两点值得注意:

  1. context 的取消是柔性的,上游任务仅仅使用 context 通知下游不再需要,不会直接干涉和中断下游任务的执行,由下游任务自行处理取消逻辑,下游自己写 select <- ctx.Done() 的逻辑。

  2. context 是线程安全的,其本身是不可变的(immutable),且并发情况下会加锁。

参考

hudingyu - 深入理解Golang之context

咖啡色的羊驼 - 由浅入深聊聊Golang的context

Last updated