1. 常用场景
并发安全的传递上下文,如传递 trace_id、kv 结构等
树状并发情景中的协程控制(某层任务取消后,及时停止所有的下层任务,上层不受影响;减少额外资源的消耗,减少处理时长)
1.1 使用例子
Copy func main () {
ctx1, cancel1 := context. WithTimeout (context. Background (), time.Second) // 1s的超时
defer cancel1 () // 保证返回前调用一次 cacel。这样比如主协程 panic 时,就可以调用到 cancel 函数,释放子协程资源
req1, _ := http. NewRequestWithContext (ctx1, http.MethodGet, "" , nil )
go A (req1)
}
func A (r http . Request ) {
... // 一些耗时的处理逻辑
B (r)
}
func B (r http . Request ) {
select {
// B这里应当先检查是否超时。如果超时,则提现返回,减少链路整理时长
case <- ctx. Done ():
return
default :
}
}
2. 底层结构
context.Context
是 Go 语言在 1.7 版本中引入标准库的接口,该接口定义了四个需要实现的方法,其中包括:
Copy // 其实是个接口
type Context interface {
Deadline () (deadline time . Time , ok bool ) // 返回 context 被取消的时间,也就是完成工作的截止日期
Done () <-chan struct {} // 被取消或者超时时候返回的一个close的channel,子函数需要对其监听。(像全局广播,因为是close的,所有协程都能收到)
Err () error // 返回被取消或超时的原因
Value (key interface {}) interface {} // 用于保存kv;树状结构:如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点
}
// A canceler is a context type that can be canceled directly. The implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel (removeFromParent bool , err error )
Done () <-chan struct {}
}
为啥不将这里的 canceler
接口与 Context
接口合并,而是分成两个方法呢?他们定义的方法中都有 Done
方法。可以解释得通的说法是,源码作者认为 cancel
方法并不是 Context
必须的,根据最小接口设计原则,将两者分开。像 emptyCtx
和 valueCtx
不是可取消的,所以他们只要实现 Context
接口即可。
库里头提供了4个 Context
实现,来供大家玩耍
Copy // 完全空的Context,用作树的根节点,其实是个int
type emptyCtx int
// 继承自Context,并实现了canceler接口
type cancelCtx struct {
Context
mu sync . Mutex
done chan struct {}
children map [ canceler ] struct {} // 记录可取消的孩子节点
err error
}
// 继承自cancelCtx,加了个timer,以支持超时处理
type timerCtx struct {
cancelCtx
timer * time . Timer // Under cancelCtx.mu.
deadline time . Time
}
// 继承自Context,加了个kv
// 携带kv信息,为全链路提供线索,比如接入elk等系统,需要来一个trace_id,valueCtx就非常适合做这个事
type valueCtx struct {
Context
key, val interface {}
}
3. 具体实现
cancelCtx
cancel方法的实现
其实主要就做了一件事:close(c.done)
把 c.done
这个 channel
关闭掉,这样所有通过 select
方法监听 <- c.Done()
的协程都会收到通知
Copy // closedchan is a reusable closed channel.
var closedchan = make ( chan struct {})
func init () {
close (closedchan)
}
func (c * cancelCtx ) cancel (removeFromParent bool , err error ) {
c.mu. Lock () // 加锁
c.err = err // 设置取消原因
if c.done == nil {
c.done = closedchan
} else {
close (c.done) // 主要逻辑:关闭 c.done 这个 channel
}
for child := range c.children { // 将子节点依次取消
child. cancel ( false , err)
}
c.children = nil
c.mu. Unlock ()
}
WithCancel
Copy // 新建一个cancel context
func WithCancel (parent Context ) (ctx Context , cancel CancelFunc ) {
c := newCancelCtx (parent)
propagateCancel (parent, & c) // 将新建的 c 加到其父节点的 child 列表里。当父上下文被取消时,子上下文也会被取消:
return & c, func () { c. cancel ( true , Canceled) }
}
func propagateCancel (parent Context , child canceler ) {
// parentCancelCtx这个函数会从parent开始向上找,直到找到一个可取消的context为止
// 即找到parent(含)最近的可取消祖先节点
if p, ok := parentCancelCtx (parent); ok {
p.mu. Lock ()
if p.children == nil {
p.children = make ( map [ canceler ] struct {})
}
p.children[child] = struct {}{} // 将child放到这个祖先节点的children列表里,以便祖先取消时能一次调用child的cancel方法
p.mu. Unlock ()
} else {
// 如果没找到可用的祖先节点,就新起一个协程,监听父节点的Done事件
go func () {
select {
case <- parent. Done ():
child. cancel ( false , parent. Err ())
case <- child. Done ():
}
}()
}
}
// 用作查祖先节点时的key,其值为0没用,用的时候都是取址(见下面函数,把 &cancelCtxKey 这个地址当成 key 来用了
var cancelCtxKey int
// 找到parent(含)最近的可取消祖先节点
func parentCancelCtx (parent Context ) ( * cancelCtx , bool ) {
p, ok := parent. Value ( & cancelCtxKey).( * cancelCtx )
if ! ok {
return nil , false
}
return p, true
}
valueCtx
Copy // 根据key查找value,如果找不到则递归从父节点向上找
func (c * cancelCtx ) Value (key interface {}) interface {} {
if key == & cancelCtxKey {
return c
}
return c.Context. Value (key) // 递归向上找
}
timerCtx
WithTimeout & WithDeadline
WithDeadline
内置了 timer
定时器,到期就执行 cancel
Withtimeout
实际上是 Withdeadline
的语法糖,使用 timeout
参数算了下 deadline
而已
Copy func WithTimeout (parent Context , timeout time . Duration ) ( Context , CancelFunc ) {
return WithDeadline (parent, time. Now (). Add (timeout))
}
func WithDeadline (parent Context , d time . Time ) ( Context , CancelFunc ) {
c := & timerCtx {
cancelCtx: newCancelCtx (parent),
deadline: d,
}
propagateCancel (parent, c)
c.mu. Lock ()
defer c.mu. Unlock ()
if c.err == nil {
// 这里设了 timer 定时器,到期执行 c.cancel 方法
c.timer = time. AfterFunc (dur, func () {
c. cancel ( true , DeadlineExceeded)
})
}
return c, func () { c. cancel ( true , Canceled) }
}
4. net/http中的实际应用
首先 Server
在开启服务时会创建一个 valueCtx
, 存储了server
的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx
。
Copy func (srv * Server ) Serve (l net . Listener ) error {
baseCtx := context. Background ()
ctx := context. WithValue (baseCtx, ServerContextKey, srv) // ServerContextKey = "http-server"
for {
rw, e := l. Accept ()
c := srv. newConn (rw)
go c. serve (ctx) // 开启新协程处理连接,并传递ctx
}
}
建立连接之后会基于传入的context
创建一个valueCtx
用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx
,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx
传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。
Copy // Serve a new connection.
func (c * conn ) serve (ctx context . Context ) {
ctx = context. WithValue (ctx, LocalAddrContextKey, c.rwc. LocalAddr ()) // 本地地址
ctx, cancelCtx := context. WithCancel (ctx)
c.cancelCtx = cancelCtx
defer cancelCtx ()
for {
w, err := c. readRequest (ctx)
serverHandler {c.server}. ServeHTTP (w, w.req)
}
}
读取到请求之后,会再次基于传入的context
创建新的cancelCtx
,并设置到当前请求对象req
上,同时生成的response
对象中cancelCtx
保存了当前context
取消的方法。
Copy func (c * conn ) readRequest (ctx context . Context ) (w * response , err error ) {
req, err := readRequest (c.bufr, keepHostHeader)
ctx, cancelCtx := context. WithCancel (ctx)
req.ctx = ctx
w = & response {
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body
}
return w, nil
}
在整个server
处理流程中,使用了一条 context
链贯穿Server
、Connection
、Request
,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。
另外有两点值得注意:
context
的取消是柔性的,上游任务仅仅使用 context
通知下游不再需要,不会直接干涉和中断下游任务的执行,由下游任务自行处理取消逻辑,下游自己写 select <- ctx.Done()
的逻辑。
context
是线程安全的,其本身是不可变的(immutable),且并发情况下会加锁。
参考
hudingyu - 深入理解Golang之context
咖啡色的羊驼 - 由浅入深聊聊Golang的context