4.Channel的实现

Go 语言中最常见的、也是经常被人提及的设计模式就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。

Go 语言使用了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。

1. 数据结构

核心结构为:

  1. 一把锁

  2. 两个链表,存放等待读和等待写的协程

  3. 有缓冲区的话,一个环型链表作为缓冲区

在一些关键路径上通过 CAS 实现无锁快速操作,提升性能

//src/runtime/chan.go
type hchan struct {
    qcount   uint           // 当前队列里的元素个数
    dataqsiz uint           // 环形队列长度,即缓冲区的大小,即 make(chan T, N) 中的 N
    buf      unsafe.Pointer // 环形队列
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识当前通道是否处于关闭状态,创建通道后,该字段设置0,即打开通道;通道调用close将其设置为1,通道关闭
    elemtype *_type         // 元素类型,用于数据传递过程中的赋值
    sendx    uint           // 环形队列尾指针
    recvx    uint           // 环形队列头指针

    recvq waitq             // 等待读消息的goroutine队列
    sendq waitq             // 等待写消息的goroutine队列

    lock mutex              //互斥锁
}
  
// sudog 代表 goroutine
type waitq struct {
    first *sudog // 这个是链表,通过next指向下一个sudog
    last  *sudog // 链表尾部
}

基本用法

写入

当我们想要向 Channel 发送数据时,就需要使用 ch <- i 语句,编译器会将它解析成 OSEND 节点并在 cmd/compile/internal/gc.walkexpr 中转换成 runtime.chansend1 函数,里面实际调用 chansend 函数,其逻辑如下:

  1. 加锁

  2. 如果 recvq 中有等待读的协程,则直接将元素写入该协程的栈里,并修改协程状态,在下次调度时唤醒它

    1. 这步节省了一个锁和内存 copy 的步骤。一般共享内存是:G1加锁、G1写入堆、G1解锁;G2加锁、G2读堆拷贝到栈、G2解锁

    2. 这里直接是:G1加锁、G1拷贝数据到G2的栈、G1唤醒G2、G1解锁

  3. 如果 recvq 为空,则检查缓冲区是否可用,如果可用就复制数据到缓冲区中,并更新队列索引

  4. 如果缓冲区已经满了,则

    1. 将协程包装成 sudog 链表节点结构,数据保存到 sudog.elem 字段,然后将节点追加到写等待者队列( sendq 链表)中

    2. gopark 该协程

  5. 写入完成释放锁

读取

Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:

这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点,后者会在类型检查阶段被转换成 OAS2RECV 类型。最终都会调用 runtime.chanrecv。其逻辑如下:

  1. 快速返回:先判断 channel 是否关闭或者为空,是则直接返回

  2. 加锁

  3. 尝试从 sendq 等待队列中获取等待写的协程

    1. 如果有写等待者

      1. 没有缓冲区:取出 goroutine 并读取数据,然后唤醒这个 goroutine,结束读取释放锁

      2. 有缓冲区 (有缓冲区的情况下还有等待的 goroutine,说明缓冲区此时满了):从缓冲区队列队头取数据,作为返回的值;再把刚刚 sendq 里取出的那个 goroutine 放到缓冲队列队尾(保证先进先出)

    2. 如果没有写等待者

      1. 缓冲区有数据:直接读取缓冲区数据

      2. 没有缓冲区或者缓冲区为空,将当前协程加入到 recvq 读等待者队列,进入睡眠,等待有数据写入时被唤醒

close

编译器会将用于关闭管道的 close 关键字转换成 OCLOSE 节点以及 runtime.closechan 函数。

关闭操作将所有排队者唤醒,并设置 closedparam 字段。

select

对读写 channel<- chanchan -> 这种写法,编译器会翻译成对函数的调用。

对于 x := <- c 这类的阻塞操作,会编译为 chansend1(c, x)

对于 select,则编译为 if selectnbsend(c, x) {} else {} 这种逻辑。

两者底层都调用的 chansend 函数,但传的 block 参数不同。chansend1 传的是 trueselectfalse

这样在使用 select 的写法时,管道才能不阻塞的立即返回 falsecase 才能跳过这个 false,无阻塞的继续向下判断 case2case3

select case

每条 case 会被包装成一个 scase 对象,里面包含了 channel,和数据元素。

select - case 则会被编译为对 selectgo 函数的调用。里面将所有 case 组成了一个 scase 数组,以随机顺序遍历这个数组。 如果能读写数据,就返回 否则新建一个 sudog 放到 scase.c 这个 channel 的等待队列里,等待唤醒。 如果所有 scase 里都没值,则最后执行 default

如果没有 default,则将 sudog 放入所有 scase.c 的等待队列里

产生阻塞/panic的情况

以下均不考虑 select: case <- chan 的情况

  1. 读空的 channel

  2. 写无缓冲 channel / 写有缓冲 channel 但数据超过缓冲区

  3. 读写 nil channel(阻塞。主线程下会导致 fatal error: all goroutines are asleep

操作
nil channel
正常 channel
已关闭 channel

<- ch

阻塞

成功 / 阻塞

读到零值

ch <-

阻塞

成功 / 阻塞

panic

close(ch)

panic

成功

panic

样例

读空 channel,阻塞

读已关闭的 channel,正常执行

写无缓冲 channel,阻塞

参考

马遥 - 如何理解 Golang 中 "不要通过共享内存来通信,而应该通过通信来共享内存"

Go channel 实现原理分析

Go channel 源码

Last updated