4.Channel的实现

Go 语言中最常见的、也是经常被人提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。

Go 语言使用了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。

1. 数据结构

核心结构为:

  1. 一把锁

  2. 两个链表,存放等待读和等待写的协程

  3. 有缓冲区的话,一个环型链表作为缓冲区

在一些关键路径上通过 CAS 实现无锁快速操作,提升性能

//src/runtime/chan.go
type hchan struct {
    qcount   uint           // 当前队列里的元素个数
    dataqsiz uint           // 环形队列长度,即缓冲区的大小,即 make(chan T, N) 中的 N
    buf      unsafe.Pointer // 环形队列
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
    elemtype *_type         // 元素类型,用于数据传递过程中的赋值
    sendx    uint           // 环形队列尾指针
    recvx    uint           // 环形队列头指针

    recvq waitq             // 等待读消息的goroutine队列
    sendq waitq             // 等待写消息的goroutine队列

    lock mutex              //互斥锁
}
  
// sudog 代表 goroutine
type waitq struct {
    first *sudog // 这个是链表,通过next指向下一个sudog
    last  *sudog // 链表尾部
}

基本用法

_, ok := <-ch // ok仅用于判断通道是否关闭,不用于判断是否有数据

// select 用法
select {
	case c := <- ch:
		println(c)
    default:
        println("default")
}

写入

当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句,编译器会将它解析成 OSEND 节点并在 cmd/compile/internal/gc.walkexpr 中转换成 runtime.chansend1 函数,里面实际调用 chansend 函数,其逻辑如下:

  1. 加锁

  2. 如果 recvq 中有等待读的协程,则直接将元素写入该协程的栈里,并修改协程状态,在下次调度时唤醒它

    1. 这步节省了一个锁和内存 copy 的步骤。一般共享内存是:G1加锁、G1写入堆、G1解锁;G2加锁、G2读堆拷贝到栈、G2解锁

    2. 这里直接是:G1加锁、G1拷贝数据到G2的栈、G1唤醒G2、G1解锁

  3. 如果 recvq 为空,则检查缓冲区是否可用,如果可用就复制数据到缓冲区中,并更新队列索引

  4. 如果缓冲区已经满了,则

    1. 将协程包装成 sudog 链表节点结构,数据保存到 sudog.elem 字段,然后将节点追加到写等待者队列( sendq 链表)中

    2. gopark 该协程

  5. 写入完成释放锁

// runtime/chan.go
// 注意这里的 block 并非表示 channel 是阻塞/非阻塞的,而是表示是否通过 select: case chan <- data 的方式来访问
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {                                      // select 模式下空 channel 直接返回;否则报错
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) // 向 nil channel 写会直接 gopark,进入阻塞
        throw("unreachable")
    }
    
    if !block && c.closed == 0 && full(c) {            // 快速返回:select 模式下 channel 写满时直接返回 false
        return false
    }
		
  	// 1. 加锁
    lock(&c.lock)

    // 2. 如果等待队列里有等待的读者,直接把数据拷贝到它的栈里;并在下次调度时唤醒它
    if sg := c.recvq.dequeue(); sg != nil {            
        send(c, sg, ep, func() { unlock(&c.lock) }, 3) // send 里面使用了 memmove 将数据复制给接收者
        return true
    }

    // 3. 缓冲区还有空间,写入缓冲区并更新索引
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // 4.1 缓冲区没有空间了,非阻塞模式(通过 select 方法写)下直接返回
    if !block {
        unlock(&c.lock)
        return false
    }

    // 4.2 同步模式下则会挂起当前协程,放到写等待队列里
    gp := getg()
    mysg := acquireSudog() // 复用或新建一个 sudog
    mysg.releasetime = 0

    mysg.elem = ep         // 保存发送的数据
    mysg.g = gp            
    mysg.c = c
    c.sendq.enqueue(mysg)  // 将 sudog 放入发送者队列
    
    // 陷入休眠
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) 

    // 到这里,说明休眠结束,有读者把数据读走了
    
    KeepAlive(ep) // 里面其实是 println(ep),用来保证读者读数据前,数据不被 GC 掉

    // 恢复 G 的状态,并将 sudog 放回缓存池
    gp.waiting = nil
    gp.activeStackChans = false
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)  // 将 sudog 放回缓存池,下次复用

    return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep) // 使用了 memmove 将数据复制给接收者
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
  
    // 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 
    // 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方
    goready(gp, skip+1)                
}

读取

Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:

i <- ch
i, ok <- ch

这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点,后者会在类型检查阶段被转换成 OAS2RECV 类型。最终都会调用 runtime.chanrecv。其逻辑如下:

  1. 快速返回:先判断 channel 是否关闭或者为空,是则直接返回

  2. 加锁

  3. 尝试从 sendq 等待队列中获取等待写的协程

    1. 如果有写等待者

      1. 没有缓冲区:取出 goroutine 并读取数据,然后唤醒这个 goroutine,结束读取释放锁

      2. 有缓冲区 (有缓冲区的情况下还有等待的 goroutine,说明缓冲区此时满了):从缓冲区队列队头取数据,作为返回的值;再把刚刚 sendq 里取出的那个 goroutine 放到缓冲队列队尾(保证先进先出)

    2. 如果没有写等待者

      1. 缓冲区有数据:直接读取缓冲区数据

      2. 没有缓冲区或者缓冲区为空,将当前协程加入到 recvq 读等待者队列,进入睡眠,等待有数据写入时被唤醒

// 读取的源码和写入其实大同小异
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 1. 判断可否快速返回
    if c == nil {          
        if !block {  // select {case <-chan} 的写法为非阻塞模式,该模式下空 channel 直接返回
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) //否则休眠协程,进入阻塞
        throw("unreachable")
    }
    
    if !block && empty(c) { // 快速返回:select 模式下 channel 为空时直接返回 false
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {       // 还做了 double check
            return true, false
        }
    }
    
    // 2. 加锁
    lock(&c.lock)
    
    // channel 关闭了且缓冲区无数据,返回一个类型默认的零值
    if c.closed != 0 && c.qcount == 0 { 
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep) // 返回 ep 类型默认的零值
        }
        return true, false
    }

    // 3.1 如果有写等待者
    if sg := c.sendq.dequeue(); sg != nil {
        // 如果 sendq 中有等待写的 goroutine,则判断 buffer
        //   如果 buffer 为空,直接从 sender 的栈中读数据
        //   否则从 buffer 头部读数据,将 sender 的数据放入 buffer 队尾
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
    
    // 3.2 没有写等待者
    // 3.2.1 缓冲区里有数据,从缓冲区读取
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        c.recvx++
        // 循环队列,下标满了则重置
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    
    // 3.2.2 缓冲区无数据,将 g 放入 recvq 并阻塞
    mysg := acquireSudog()
    c.recvq.enqueue(mysg)

    // 立刻触发一次调度
   	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
}

// 从 sendq 里读数据
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果 buffer 为空,直接从 sendq 里的 sg 复制元素到 ep
    if c.dataqsiz == 0 {
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
    // 如果 buffer 不为空,那么 buffer 一定是满的
    } else {
        qp := chanbuf(c, c.recvx)             // 取出 buffer 的头部元素,复制到 ep
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemmove(c.elemtype, qp, sg.elem) // 再把 sg 的值复制回头部那个元素
        c.recvx++                             // 这里很 tricky,没有用弹出头部、再插入队尾的做法
        if c.recvx == c.dataqsiz {            // 而是直接原地更新头部元素值
            c.recvx = 0                       // 由于 buffer 是个环形队列,因此更新队尾偏移量就可以了
        }
        c.sendx = c.recvx                     // 也更新 buffer 的写入偏移量
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)                       // 下次调度唤醒休眠的 sudog
}

close

编译器会将用于关闭管道的 close 关键字转换成 OCLOSE 节点以及 runtime.closechan 函数。

关闭操作将所有排队者唤醒,并设置 closedparam 字段。

func closechan(c *hchan) {
    // channel 为空指针时 panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
  
    // channel 已关闭时 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    c.closed = 1

    
    // 释放所有接收者
    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg) // 这个参数表名唤醒者是 closechan
        sg.success = false
        glist.push(gp)
    }

    // 释放所有发送者(发送者继续发送会panic)
    for {
        sg := c.sendq.dequeue()
        // 和上面一样
        ...
    }
    unlock(&c.lock)

    // 唤醒所有 G
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3) // 触发调度
    }
}

select

对读写 channel<- chanchan -> 这种写法,编译器会翻译成对函数的调用。

对于 x := <- c 这类的阻塞操作,会编译为 chansend1(c, x)

对于 select,则编译为 if selectnbsend(c, x) {} else {} 这种逻辑。

两者底层都调用的 chansend 函数,但传的 block 参数不同。chansend1 传的是 trueselectfalse

这样在使用 select 的写法时,管道才能不阻塞的立即返回 falsecase 才能跳过这个 false,无阻塞的继续向下判断 case2case3

// c <- x 
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

// select case c <- 写法
// 编译器会将
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// 编译为
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}

// x:= <- c 
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

// x, ok := <- c
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

// select case <- c 写法
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
    return chanrecv(c, elem, false)
}

select case

每条 case 会被包装成一个 scase 对象,里面包含了 channel,和数据元素。

//runtime/select.go
type scase struct {
    c    *hchan         // chan
    elem unsafe.Pointer // data element
}

select - case 则会被编译为对 selectgo 函数的调用。里面将所有 case 组成了一个 scase 数组,以随机顺序遍历这个数组。 如果能读写数据,就返回 否则新建一个 sudog 放到 scase.c 这个 channel 的等待队列里,等待唤醒。 如果所有 scase 里都没值,则最后执行 default

如果没有 default,则将 sudog 放入所有 scase.c 的等待队列里

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))      // case 数组
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // 顺序数组

    ncases := nsends + nrecvs                            // case 数量
    scases := cas1[:ncases:ncases]                       // case 数组
    pollorder := order1[:ncases:ncases]                  // 遍历顺序
    lockorder := order1[ncases:][:ncases:ncases]         // 加锁顺序
    
    norder := 0                                          
    for i := range scases {                 // 这个循环用于生成随机排列顺序,后续按这个顺序遍历
        cas := &scases[i]
        j := fastrandn(uint32(norder + 1))  // 居然是蓄水池抽样
        pollorder[norder] = pollorder[j]    // 以 1/i+1 的概率对调 j 和 i+1
        pollorder[j] = uint16(i)
        norder++
    }
    
    // 阶段1: 随机顺序遍历
    for _, casei := range pollorder { 
        casi = int(casei)              // 大无语命名
        cas = &scases[casi]
        c = cas.c

        // 这里会根据不同情况通过 goto 语句跳转到函数内部的不同标签执行相应的逻辑
        if casi >= nsends {
            sg = c.sendq.dequeue()   // 从等待队列里取 sudog
            if sg != nil {           // 取到了,就执行同步读
                goto recv
            }
            if c.qcount > 0 {        // 否则判断 buffer
                goto bufrecv         // 有 buffer 则读 buffer
            }
            if c.closed != 0 {
                goto rclose
            }
        } else {                     // 和上面差不多
            ...
        }
    }

    if !block {                      // select 里有 default 时,block = false,说明为非阻塞,执行这里
        selunlock(scases, lockorder) // 解锁
        casi = -1                    // 返回值为 -1
        goto retc                    // 返回
    }

    // 阶段2:所有 case 都阻塞住了,且没有 default
    // 将 select 挂到所有 case 的等待列表里,等待被唤醒
    gp = getg()
    nextp = &gp.waiting
    for _, casei := range lockorder { 
        casi = int(casei)
        cas = &scases[casi]
        c = cas.c
        sg := acquireSudog()    // 新建 sudog
        sg.g = gp
        sg.isSelect = true
        sg.elem = cas.elem
        sg.c = c
        if casi < nsends {
            c.sendq.enqueue(sg) // 挂到写等待队列
        } else {
            c.recvq.enqueue(sg) // 挂到读等待队列
        }
    }
}

产生阻塞/panic的情况

以下均不考虑 select: case <- chan 的情况

  1. 读空的 channel

  2. 写无缓冲 channel / 写有缓冲 channel 但数据超过缓冲区

  3. 读写 nil channel(阻塞。主线程下会导致 fatal error: all goroutines are asleep

样例

读空 channel,阻塞

t := make(chan int)
x, ok := <- t //阻塞在这里。主线程被放进t的recvq了,然后主线程被挂起,等待t写入后唤醒主线程

读已关闭的 channel,正常执行

t := make(chan int)
close(t)
x, ok := <- t //0, false

写无缓冲 channel,阻塞

t := make(chan int)
t <- 1 //阻塞在这里,必须有其他gorutine消费才可以继续执行

参考

马遥 - 如何理解 Golang 中 "不要通过共享内存来通信,而应该通过通信来共享内存"

Go channel 实现原理分析

Go channel 源码

Last updated