Go 语言中最常见的、也是经常被人提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。
Go 语言使用了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。
1. 数据结构
核心结构为:
在一些关键路径上通过 CAS
实现无锁快速操作,提升性能
//src/runtime/chan.go
type hchan struct {
qcount uint // 当前队列里的元素个数
dataqsiz uint // 环形队列长度,即缓冲区的大小,即 make(chan T, N) 中的 N
buf unsafe.Pointer // 环形队列
elemsize uint16 // 每个元素的大小
closed uint32 // 标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
elemtype *_type // 元素类型,用于数据传递过程中的赋值
sendx uint // 环形队列尾指针
recvx uint // 环形队列头指针
recvq waitq // 等待读消息的goroutine队列
sendq waitq // 等待写消息的goroutine队列
lock mutex //互斥锁
}
// sudog 代表 goroutine
type waitq struct {
first *sudog // 这个是链表,通过next指向下一个sudog
last *sudog // 链表尾部
}
基本用法
_, ok := <-ch // ok仅用于判断通道是否关闭,不用于判断是否有数据
// select 用法
select {
case c := <- ch:
println(c)
default:
println("default")
}
写入
当我们想要向 Channel 发送数据时,就需要使用 ch <- i
语句,编译器会将它解析成 OSEND
节点并在 cmd/compile/internal/gc.walkexpr
中转换成 runtime.chansend1
函数,里面实际调用 chansend
函数,其逻辑如下:
如果 recvq
中有等待读的协程,则直接将元素写入该协程的栈里,并修改协程状态,在下次调度时唤醒它
这步节省了一个锁和内存 copy
的步骤。一般共享内存是:G1加锁、G1写入堆、G1解锁;G2加锁、G2读堆拷贝到栈、G2解锁
这里直接是:G1加锁、G1拷贝数据到G2的栈、G1唤醒G2、G1解锁
如果 recvq
为空,则检查缓冲区是否可用,如果可用就复制数据到缓冲区中,并更新队列索引
如果缓冲区已经满了,则
将协程包装成 sudog
链表节点结构,数据保存到 sudog.elem
字段,然后将节点追加到写等待者队列( sendq
链表)中
// runtime/chan.go
// 注意这里的 block 并非表示 channel 是阻塞/非阻塞的,而是表示是否通过 select: case chan <- data 的方式来访问
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil { // select 模式下空 channel 直接返回;否则报错
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) // 向 nil channel 写会直接 gopark,进入阻塞
throw("unreachable")
}
if !block && c.closed == 0 && full(c) { // 快速返回:select 模式下 channel 写满时直接返回 false
return false
}
// 1. 加锁
lock(&c.lock)
// 2. 如果等待队列里有等待的读者,直接把数据拷贝到它的栈里;并在下次调度时唤醒它
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // send 里面使用了 memmove 将数据复制给接收者
return true
}
// 3. 缓冲区还有空间,写入缓冲区并更新索引
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 4.1 缓冲区没有空间了,非阻塞模式(通过 select 方法写)下直接返回
if !block {
unlock(&c.lock)
return false
}
// 4.2 同步模式下则会挂起当前协程,放到写等待队列里
gp := getg()
mysg := acquireSudog() // 复用或新建一个 sudog
mysg.releasetime = 0
mysg.elem = ep // 保存发送的数据
mysg.g = gp
mysg.c = c
c.sendq.enqueue(mysg) // 将 sudog 放入发送者队列
// 陷入休眠
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 到这里,说明休眠结束,有读者把数据读走了
KeepAlive(ep) // 里面其实是 println(ep),用来保证读者读数据前,数据不被 GC 掉
// 恢复 G 的状态,并将 sudog 放回缓存池
gp.waiting = nil
gp.activeStackChans = false
gp.param = nil
mysg.c = nil
releaseSudog(mysg) // 将 sudog 放回缓存池,下次复用
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep) // 使用了 memmove 将数据复制给接收者
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
// 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable
// 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方
goready(gp, skip+1)
}
读取
Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:
这两种不同的方法经过编译器的处理都会变成 ORECV
类型的节点,后者会在类型检查阶段被转换成 OAS2RECV
类型。最终都会调用 runtime.chanrecv
。其逻辑如下:
快速返回:先判断 channel
是否关闭或者为空,是则直接返回
尝试从 sendq
等待队列中获取等待写的协程
如果有写等待者
没有缓冲区:取出 goroutine
并读取数据,然后唤醒这个 goroutine
,结束读取释放锁
有缓冲区 (有缓冲区的情况下还有等待的 goroutine
,说明缓冲区此时满了):从缓冲区队列队头取数据,作为返回的值;再把刚刚 sendq
里取出的那个 goroutine
放到缓冲队列队尾(保证先进先出)
如果没有写等待者
没有缓冲区或者缓冲区为空,将当前协程加入到 recvq
读等待者队列,进入睡眠,等待有数据写入时被唤醒
// 读取的源码和写入其实大同小异
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1. 判断可否快速返回
if c == nil {
if !block { // select {case <-chan} 的写法为非阻塞模式,该模式下空 channel 直接返回
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) //否则休眠协程,进入阻塞
throw("unreachable")
}
if !block && empty(c) { // 快速返回:select 模式下 channel 为空时直接返回 false
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) { // 还做了 double check
return true, false
}
}
// 2. 加锁
lock(&c.lock)
// channel 关闭了且缓冲区无数据,返回一个类型默认的零值
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // 返回 ep 类型默认的零值
}
return true, false
}
// 3.1 如果有写等待者
if sg := c.sendq.dequeue(); sg != nil {
// 如果 sendq 中有等待写的 goroutine,则判断 buffer
// 如果 buffer 为空,直接从 sender 的栈中读数据
// 否则从 buffer 头部读数据,将 sender 的数据放入 buffer 队尾
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 3.2 没有写等待者
// 3.2.1 缓冲区里有数据,从缓冲区读取
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
c.recvx++
// 循环队列,下标满了则重置
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 3.2.2 缓冲区无数据,将 g 放入 recvq 并阻塞
mysg := acquireSudog()
c.recvq.enqueue(mysg)
// 立刻触发一次调度
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
}
// 从 sendq 里读数据
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果 buffer 为空,直接从 sendq 里的 sg 复制元素到 ep
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
// 如果 buffer 不为空,那么 buffer 一定是满的
} else {
qp := chanbuf(c, c.recvx) // 取出 buffer 的头部元素,复制到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem) // 再把 sg 的值复制回头部那个元素
c.recvx++ // 这里很 tricky,没有用弹出头部、再插入队尾的做法
if c.recvx == c.dataqsiz { // 而是直接原地更新头部元素值
c.recvx = 0 // 由于 buffer 是个环形队列,因此更新队尾偏移量就可以了
}
c.sendx = c.recvx // 也更新 buffer 的写入偏移量
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1) // 下次调度唤醒休眠的 sudog
}
close
编译器会将用于关闭管道的 close
关键字转换成 OCLOSE
节点以及 runtime.closechan
函数。
关闭操作将所有排队者唤醒,并设置 closed
、param
字段。
func closechan(c *hchan) {
// channel 为空指针时 panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// channel 已关闭时 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
// 释放所有接收者
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = unsafe.Pointer(sg) // 这个参数表名唤醒者是 closechan
sg.success = false
glist.push(gp)
}
// 释放所有发送者(发送者继续发送会panic)
for {
sg := c.sendq.dequeue()
// 和上面一样
...
}
unlock(&c.lock)
// 唤醒所有 G
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3) // 触发调度
}
}
select
对读写 channel
的 <- chan
、chan ->
这种写法,编译器会翻译成对函数的调用。
写
对于 x := <- c
这类的阻塞操作,会编译为 chansend1(c, x)
。
对于 select
,则编译为 if selectnbsend(c, x) {} else {}
这种逻辑。
两者底层都调用的 chansend
函数,但传的 block
参数不同。chansend1
传的是 true
,select
传 false
。
这样在使用 select
的写法时,管道才能不阻塞的立即返回 false
,case
才能跳过这个 false
,无阻塞的继续向下判断 case2
、case3
。
// c <- x
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
// select case c <- 写法
// 编译器会将
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// 编译为
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
读
// x:= <- c
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// x, ok := <- c
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// select case <- c 写法
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
select case
每条 case
会被包装成一个 scase
对象,里面包含了 channel
,和数据元素。
//runtime/select.go
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}
select - case
则会被编译为对 selectgo
函数的调用。里面将所有 case
组成了一个 scase
数组,以随机顺序遍历这个数组。 如果能读写数据,就返回 否则新建一个 sudog
放到 scase.c
这个 channel
的等待队列里,等待唤醒。 如果所有 scase
里都没值,则最后执行 default
。
如果没有 default
,则将 sudog
放入所有 scase.c
的等待队列里
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) // case 数组
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // 顺序数组
ncases := nsends + nrecvs // case 数量
scases := cas1[:ncases:ncases] // case 数组
pollorder := order1[:ncases:ncases] // 遍历顺序
lockorder := order1[ncases:][:ncases:ncases] // 加锁顺序
norder := 0
for i := range scases { // 这个循环用于生成随机排列顺序,后续按这个顺序遍历
cas := &scases[i]
j := fastrandn(uint32(norder + 1)) // 居然是蓄水池抽样
pollorder[norder] = pollorder[j] // 以 1/i+1 的概率对调 j 和 i+1
pollorder[j] = uint16(i)
norder++
}
// 阶段1: 随机顺序遍历
for _, casei := range pollorder {
casi = int(casei) // 大无语命名
cas = &scases[casi]
c = cas.c
// 这里会根据不同情况通过 goto 语句跳转到函数内部的不同标签执行相应的逻辑
if casi >= nsends {
sg = c.sendq.dequeue() // 从等待队列里取 sudog
if sg != nil { // 取到了,就执行同步读
goto recv
}
if c.qcount > 0 { // 否则判断 buffer
goto bufrecv // 有 buffer 则读 buffer
}
if c.closed != 0 {
goto rclose
}
} else { // 和上面差不多
...
}
}
if !block { // select 里有 default 时,block = false,说明为非阻塞,执行这里
selunlock(scases, lockorder) // 解锁
casi = -1 // 返回值为 -1
goto retc // 返回
}
// 阶段2:所有 case 都阻塞住了,且没有 default
// 将 select 挂到所有 case 的等待列表里,等待被唤醒
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog() // 新建 sudog
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.c = c
if casi < nsends {
c.sendq.enqueue(sg) // 挂到写等待队列
} else {
c.recvq.enqueue(sg) // 挂到读等待队列
}
}
}
产生阻塞/panic的情况
以下均不考虑 select: case <- chan
的情况
写无缓冲 channel
/ 写有缓冲 channel
但数据超过缓冲区
读写 nil channel
(阻塞。主线程下会导致 fatal error: all goroutines are asleep
)
样例
读空 channel
,阻塞
t := make(chan int)
x, ok := <- t //阻塞在这里。主线程被放进t的recvq了,然后主线程被挂起,等待t写入后唤醒主线程
读已关闭的 channel
,正常执行
t := make(chan int)
close(t)
x, ok := <- t //0, false
写无缓冲 channel
,阻塞
t := make(chan int)
t <- 1 //阻塞在这里,必须有其他gorutine消费才可以继续执行
参考
马遥 - 如何理解 Golang 中 "不要通过共享内存来通信,而应该通过通信来共享内存"
Go channel 实现原理分析
Go channel 源码