// 读取的源码和写入其实大同小异
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1. 判断可否快速返回
if c == nil {
if !block { // select {case <-chan} 的写法为非阻塞模式,该模式下空 channel 直接返回
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) //否则休眠协程,进入阻塞
throw("unreachable")
}
if !block && empty(c) { // 快速返回:select 模式下 channel 为空时直接返回 false
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) { // 还做了 double check
return true, false
}
}
// 2. 加锁
lock(&c.lock)
// channel 关闭了且缓冲区无数据,返回一个类型默认的零值
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // 返回 ep 类型默认的零值
}
return true, false
}
// 3.1 如果有写等待者
if sg := c.sendq.dequeue(); sg != nil {
// 如果 sendq 中有等待写的 goroutine,则判断 buffer
// 如果 buffer 为空,直接从 sender 的栈中读数据
// 否则从 buffer 头部读数据,将 sender 的数据放入 buffer 队尾
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 3.2 没有写等待者
// 3.2.1 缓冲区里有数据,从缓冲区读取
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
c.recvx++
// 循环队列,下标满了则重置
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 3.2.2 缓冲区无数据,将 g 放入 recvq 并阻塞
mysg := acquireSudog()
c.recvq.enqueue(mysg)
// 立刻触发一次调度
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
}
// 从 sendq 里读数据
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果 buffer 为空,直接从 sendq 里的 sg 复制元素到 ep
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
// 如果 buffer 不为空,那么 buffer 一定是满的
} else {
qp := chanbuf(c, c.recvx) // 取出 buffer 的头部元素,复制到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemmove(c.elemtype, qp, sg.elem) // 再把 sg 的值复制回头部那个元素
c.recvx++ // 这里很 tricky,没有用弹出头部、再插入队尾的做法
if c.recvx == c.dataqsiz { // 而是直接原地更新头部元素值
c.recvx = 0 // 由于 buffer 是个环形队列,因此更新队尾偏移量就可以了
}
c.sendx = c.recvx // 也更新 buffer 的写入偏移量
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1) // 下次调度唤醒休眠的 sudog
}