# 4.Channel的实现

Go 语言中最常见的、也是经常被人提及的设计模式就是：不要通过共享内存的方式进行通信，而是应该通过通信的方式共享内存。在很多主流的编程语言中，多个线程传递数据的方式一般都是共享内存，为了解决线程竞争，我们需要限制同一时间能够读写这些变量的线程数量，然而这与 Go 语言鼓励的设计并不相同。

Go 语言使用了一种不同的并发模型，即通信顺序进程（Communicating sequential processes，CSP）。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介，Goroutine 之间会通过 Channel 传递数据。

## 1. 数据结构

核心结构为：

1. 一把锁
2. 两个链表，存放等待读和等待写的协程
3. 有缓冲区的话，一个环型链表作为缓冲区

在一些关键路径上通过 `CAS` 实现无锁快速操作，提升性能

```go
//src/runtime/chan.go
type hchan struct {
    qcount   uint           // 当前队列里的元素个数
    dataqsiz uint           // 环形队列长度，即缓冲区的大小，即 make(chan T, N) 中的 N
    buf      unsafe.Pointer // 环形队列
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识当前通道是否处于关闭状态，创建通道后，该字段设置0，即打开通道；通道调用close将其设置为1，通道关闭
    elemtype *_type         // 元素类型，用于数据传递过程中的赋值
    sendx    uint           // 环形队列尾指针
    recvx    uint           // 环形队列头指针

    recvq waitq             // 等待读消息的goroutine队列
    sendq waitq             // 等待写消息的goroutine队列

    lock mutex              //互斥锁
}
  
// sudog 代表 goroutine
type waitq struct {
    first *sudog // 这个是链表，通过next指向下一个sudog
    last  *sudog // 链表尾部
}
```

### 基本用法

```go
_, ok := <-ch // ok仅用于判断通道是否关闭，不用于判断是否有数据

// select 用法
select {
	case c := <- ch:
		println(c)
    default:
        println("default")
}
```

### 写入

当我们想要向 Channel 发送数据时，就需要使用 `ch <- i` 语句，编译器会将它解析成 `OSEND` 节点并在 `cmd/compile/internal/gc.walkexpr` 中转换成 `runtime.chansend1` 函数，里面实际调用 `chansend` 函数，其逻辑如下：

1. 加锁
2. 如果 `recvq` 中有等待读的协程，则直接将元素写入该协程的栈里，并修改协程状态，在下次调度时唤醒它
   1. 这步节省了一个锁和内存 `copy` 的步骤。一般共享内存是：G1加锁、G1写入堆、G1解锁；G2加锁、G2读堆拷贝到栈、G2解锁
   2. 这里直接是：G1加锁、G1拷贝数据到G2的栈、G1唤醒G2、G1解锁
3. 如果 `recvq` 为空，则检查缓冲区是否可用，如果可用就复制数据到缓冲区中，并更新队列索引
4. 如果缓冲区已经满了，则
   1. 将协程包装成 `sudog` 链表节点结构，数据保存到 `sudog.elem` 字段，然后将节点追加到写等待者队列（ `sendq` 链表）中
   2. `gopark` 该协程
5. 写入完成释放锁

```go
// runtime/chan.go
// 注意这里的 block 并非表示 channel 是阻塞/非阻塞的，而是表示是否通过 select: case chan <- data 的方式来访问
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {                                      // select 模式下空 channel 直接返回；否则报错
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) // 向 nil channel 写会直接 gopark，进入阻塞
        throw("unreachable")
    }
    
    if !block && c.closed == 0 && full(c) {            // 快速返回：select 模式下 channel 写满时直接返回 false
        return false
    }
		
  	// 1. 加锁
    lock(&c.lock)

    // 2. 如果等待队列里有等待的读者，直接把数据拷贝到它的栈里；并在下次调度时唤醒它
    if sg := c.recvq.dequeue(); sg != nil {            
        send(c, sg, ep, func() { unlock(&c.lock) }, 3) // send 里面使用了 memmove 将数据复制给接收者
        return true
    }

    // 3. 缓冲区还有空间，写入缓冲区并更新索引
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // 4.1 缓冲区没有空间了，非阻塞模式（通过 select 方法写）下直接返回
    if !block {
        unlock(&c.lock)
        return false
    }

    // 4.2 同步模式下则会挂起当前协程，放到写等待队列里
    gp := getg()
    mysg := acquireSudog() // 复用或新建一个 sudog
    mysg.releasetime = 0

    mysg.elem = ep         // 保存发送的数据
    mysg.g = gp            
    mysg.c = c
    c.sendq.enqueue(mysg)  // 将 sudog 放入发送者队列
    
    // 陷入休眠
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) 

    // 到这里，说明休眠结束，有读者把数据读走了
    
    KeepAlive(ep) // 里面其实是 println(ep)，用来保证读者读数据前，数据不被 GC 掉

    // 恢复 G 的状态，并将 sudog 放回缓存池
    gp.waiting = nil
    gp.activeStackChans = false
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)  // 将 sudog 放回缓存池，下次复用

    return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep) // 使用了 memmove 将数据复制给接收者
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
  
    // 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 
    // 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行，该处理器在下一次调度时会立刻唤醒数据的接收方
    goready(gp, skip+1)                
}
```

### 读取

Go 语言中可以使用两种不同的方式去接收 Channel 中的数据：

```go
i <- ch
i, ok <- ch
```

这两种不同的方法经过编译器的处理都会变成 `ORECV` 类型的节点，后者会在类型检查阶段被转换成 `OAS2RECV` 类型。最终都会调用 `runtime.chanrecv`。其逻辑如下：

1. 快速返回：先判断 `channel` 是否关闭或者为空，是则直接返回
2. 加锁
3. 尝试从 `sendq` 等待队列中获取等待写的协程
   1. 如果有写等待者
      1. 没有缓冲区：取出 `goroutine` 并读取数据，然后唤醒这个 `goroutine`，结束读取释放锁
      2. 有缓冲区 (有缓冲区的情况下还有等待的 `goroutine`，说明缓冲区此时满了)：从缓冲区队列队头取数据，作为返回的值；再把刚刚 `sendq` 里取出的那个 `goroutine` 放到缓冲队列队尾（保证先进先出）
   2. 如果没有写等待者
      1. 缓冲区有数据：直接读取缓冲区数据
      2. 没有缓冲区或者缓冲区为空，将当前协程加入到 `recvq` 读等待者队列，进入睡眠，等待有数据写入时被唤醒

```go
// 读取的源码和写入其实大同小异
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 1. 判断可否快速返回
    if c == nil {          
        if !block {  // select {case <-chan} 的写法为非阻塞模式，该模式下空 channel 直接返回
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) //否则休眠协程，进入阻塞
        throw("unreachable")
    }
    
    if !block && empty(c) { // 快速返回：select 模式下 channel 为空时直接返回 false
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {       // 还做了 double check
            return true, false
        }
    }
    
    // 2. 加锁
    lock(&c.lock)
    
    // channel 关闭了且缓冲区无数据，返回一个类型默认的零值
    if c.closed != 0 && c.qcount == 0 { 
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep) // 返回 ep 类型默认的零值
        }
        return true, false
    }

    // 3.1 如果有写等待者
    if sg := c.sendq.dequeue(); sg != nil {
        // 如果 sendq 中有等待写的 goroutine，则判断 buffer
        //   如果 buffer 为空，直接从 sender 的栈中读数据
        //   否则从 buffer 头部读数据，将 sender 的数据放入 buffer 队尾
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
    
    // 3.2 没有写等待者
    // 3.2.1 缓冲区里有数据，从缓冲区读取
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        c.recvx++
        // 循环队列，下标满了则重置
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    
    // 3.2.2 缓冲区无数据，将 g 放入 recvq 并阻塞
    mysg := acquireSudog()
    c.recvq.enqueue(mysg)

    // 立刻触发一次调度
   	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
}

// 从 sendq 里读数据
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果 buffer 为空，直接从 sendq 里的 sg 复制元素到 ep
    if c.dataqsiz == 0 {
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
    // 如果 buffer 不为空，那么 buffer 一定是满的
    } else {
        qp := chanbuf(c, c.recvx)             // 取出 buffer 的头部元素，复制到 ep
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemmove(c.elemtype, qp, sg.elem) // 再把 sg 的值复制回头部那个元素
        c.recvx++                             // 这里很 tricky，没有用弹出头部、再插入队尾的做法
        if c.recvx == c.dataqsiz {            // 而是直接原地更新头部元素值
            c.recvx = 0                       // 由于 buffer 是个环形队列，因此更新队尾偏移量就可以了
        }
        c.sendx = c.recvx                     // 也更新 buffer 的写入偏移量
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)                       // 下次调度唤醒休眠的 sudog
}
```

### close

编译器会将用于关闭管道的 `close` 关键字转换成 `OCLOSE` 节点以及 [`runtime.closechan`](https://draveness.me/golang/tree/runtime.closechan) 函数。

关闭操作将所有排队者唤醒，并设置 `closed`、`param` 字段。

```go
func closechan(c *hchan) {
    // channel 为空指针时 panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
  
    // channel 已关闭时 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    c.closed = 1

    
    // 释放所有接收者
    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg) // 这个参数表名唤醒者是 closechan
        sg.success = false
        glist.push(gp)
    }

    // 释放所有发送者（发送者继续发送会panic）
    for {
        sg := c.sendq.dequeue()
        // 和上面一样
        ...
    }
    unlock(&c.lock)

    // 唤醒所有 G
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3) // 触发调度
    }
}
```

### select

对读写 `channel` 的 `<- chan`、`chan ->` 这种写法，编译器会翻译成对函数的调用。

**写**

对于 `x := <- c` 这类的阻塞操作，会编译为 `chansend1(c, x)`。

对于 `select`，则编译为 `if selectnbsend(c, x) {} else {}` 这种逻辑。

两者底层都调用的 `chansend` 函数，但传的 `block` 参数不同。`chansend1` 传的是 `true`，`select` 传 `false`。

这样在使用 `select` 的写法时，管道才能不阻塞的立即返回 `false`，`case` 才能跳过这个 `false`，无阻塞的继续向下判断 `case2`、`case3`。

```go
// c <- x 
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

// select case c <- 写法
// 编译器会将
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// 编译为
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}
```

**读**

```go
// x:= <- c 
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

// x, ok := <- c
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

// select case <- c 写法
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
    return chanrecv(c, elem, false)
}
```

**select case**

每条 `case` 会被包装成一个 `scase` 对象，里面包含了 `channel`，和数据元素。

```go
//runtime/select.go
type scase struct {
    c    *hchan         // chan
    elem unsafe.Pointer // data element
}
```

`select - case` 则会被编译为对 `selectgo` 函数的调用。里面将所有 `case` 组成了一个 `scase` 数组，以随机顺序遍历这个数组。 如果能读写数据，就返回 否则新建一个 `sudog` 放到 `scase.c` 这个 `channel` 的等待队列里，等待唤醒。 如果所有 `scase` 里都没值，则最后执行 `default`。

如果没有 `default`，则将 `sudog` 放入所有 `scase.c` 的等待队列里

```go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))      // case 数组
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // 顺序数组

    ncases := nsends + nrecvs                            // case 数量
    scases := cas1[:ncases:ncases]                       // case 数组
    pollorder := order1[:ncases:ncases]                  // 遍历顺序
    lockorder := order1[ncases:][:ncases:ncases]         // 加锁顺序
    
    norder := 0                                          
    for i := range scases {                 // 这个循环用于生成随机排列顺序，后续按这个顺序遍历
        cas := &scases[i]
        j := fastrandn(uint32(norder + 1))  // 居然是蓄水池抽样
        pollorder[norder] = pollorder[j]    // 以 1/i+1 的概率对调 j 和 i+1
        pollorder[j] = uint16(i)
        norder++
    }
    
    // 阶段1： 随机顺序遍历
    for _, casei := range pollorder { 
        casi = int(casei)              // 大无语命名
        cas = &scases[casi]
        c = cas.c

        // 这里会根据不同情况通过 goto 语句跳转到函数内部的不同标签执行相应的逻辑
        if casi >= nsends {
            sg = c.sendq.dequeue()   // 从等待队列里取 sudog
            if sg != nil {           // 取到了，就执行同步读
                goto recv
            }
            if c.qcount > 0 {        // 否则判断 buffer
                goto bufrecv         // 有 buffer 则读 buffer
            }
            if c.closed != 0 {
                goto rclose
            }
        } else {                     // 和上面差不多
            ...
        }
    }

    if !block {                      // select 里有 default 时，block = false，说明为非阻塞，执行这里
        selunlock(scases, lockorder) // 解锁
        casi = -1                    // 返回值为 -1
        goto retc                    // 返回
    }

    // 阶段2：所有 case 都阻塞住了，且没有 default
    // 将 select 挂到所有 case 的等待列表里，等待被唤醒
    gp = getg()
    nextp = &gp.waiting
    for _, casei := range lockorder { 
        casi = int(casei)
        cas = &scases[casi]
        c = cas.c
        sg := acquireSudog()    // 新建 sudog
        sg.g = gp
        sg.isSelect = true
        sg.elem = cas.elem
        sg.c = c
        if casi < nsends {
            c.sendq.enqueue(sg) // 挂到写等待队列
        } else {
            c.recvq.enqueue(sg) // 挂到读等待队列
        }
    }
}
```

### 产生阻塞/panic的情况

以下均不考虑 `select: case <- chan` 的情况

1. 读空的 `channel`
2. 写无缓冲 `channel` / 写有缓冲 `channel` 但数据超过缓冲区
3. 读写 `nil channel`（阻塞。主线程下会导致 `fatal error: all goroutines are asleep`）

| 操作          | nil channel | 正常 channel | 已关闭 channel |
| ----------- | ----------- | ---------- | ----------- |
| `<- ch`     | 阻塞          | 成功 / 阻塞    | 读到零值        |
| `ch <-`     | 阻塞          | 成功 / 阻塞    | `panic`     |
| `close(ch)` | `panic`     | 成功         | `panic`     |

### 样例

读空 `channel`，阻塞

```go
t := make(chan int)
x, ok := <- t //阻塞在这里。主线程被放进t的recvq了，然后主线程被挂起，等待t写入后唤醒主线程
```

读已关闭的 `channel`，正常执行

```go
t := make(chan int)
close(t)
x, ok := <- t //0, false
```

写无缓冲 `channel`，阻塞

```go
t := make(chan int)
t <- 1 //阻塞在这里，必须有其他gorutine消费才可以继续执行
```

**参考**

> [马遥 - 如何理解 Golang 中 "不要通过共享内存来通信，而应该通过通信来共享内存"](https://www.zhihu.com/question/58004055/answer/1972944380)
>
> [Go channel 实现原理分析](https://www.jianshu.com/p/d841f251d3bc)
>
> [Go channel 源码](https://github.com/golang/go/blob/go1.12.7/src/runtime/chan.go#L183)
