# 4.Channel的实现

Go 语言中最常见的、也是经常被人提及的设计模式就是：不要通过共享内存的方式进行通信，而是应该通过通信的方式共享内存。在很多主流的编程语言中，多个线程传递数据的方式一般都是共享内存，为了解决线程竞争，我们需要限制同一时间能够读写这些变量的线程数量，然而这与 Go 语言鼓励的设计并不相同。

Go 语言使用了一种不同的并发模型，即通信顺序进程（Communicating sequential processes，CSP）。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介，Goroutine 之间会通过 Channel 传递数据。

## 1. 数据结构

核心结构为：

1. 一把锁
2. 两个链表，存放等待读和等待写的协程
3. 有缓冲区的话，一个环型链表作为缓冲区

在一些关键路径上通过 `CAS` 实现无锁快速操作，提升性能

```go
//src/runtime/chan.go
type hchan struct {
    qcount   uint           // 当前队列里的元素个数
    dataqsiz uint           // 环形队列长度，即缓冲区的大小，即 make(chan T, N) 中的 N
    buf      unsafe.Pointer // 环形队列
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识当前通道是否处于关闭状态，创建通道后，该字段设置0，即打开通道；通道调用close将其设置为1，通道关闭
    elemtype *_type         // 元素类型，用于数据传递过程中的赋值
    sendx    uint           // 环形队列尾指针
    recvx    uint           // 环形队列头指针

    recvq waitq             // 等待读消息的goroutine队列
    sendq waitq             // 等待写消息的goroutine队列

    lock mutex              //互斥锁
}
  
// sudog 代表 goroutine
type waitq struct {
    first *sudog // 这个是链表，通过next指向下一个sudog
    last  *sudog // 链表尾部
}
```

### 基本用法

```go
_, ok := <-ch // ok仅用于判断通道是否关闭，不用于判断是否有数据

// select 用法
select {
	case c := <- ch:
		println(c)
    default:
        println("default")
}
```

### 写入

当我们想要向 Channel 发送数据时，就需要使用 `ch <- i` 语句，编译器会将它解析成 `OSEND` 节点并在 `cmd/compile/internal/gc.walkexpr` 中转换成 `runtime.chansend1` 函数，里面实际调用 `chansend` 函数，其逻辑如下：

1. 加锁
2. 如果 `recvq` 中有等待读的协程，则直接将元素写入该协程的栈里，并修改协程状态，在下次调度时唤醒它
   1. 这步节省了一个锁和内存 `copy` 的步骤。一般共享内存是：G1加锁、G1写入堆、G1解锁；G2加锁、G2读堆拷贝到栈、G2解锁
   2. 这里直接是：G1加锁、G1拷贝数据到G2的栈、G1唤醒G2、G1解锁
3. 如果 `recvq` 为空，则检查缓冲区是否可用，如果可用就复制数据到缓冲区中，并更新队列索引
4. 如果缓冲区已经满了，则
   1. 将协程包装成 `sudog` 链表节点结构，数据保存到 `sudog.elem` 字段，然后将节点追加到写等待者队列（ `sendq` 链表）中
   2. `gopark` 该协程
5. 写入完成释放锁

```go
// runtime/chan.go
// 注意这里的 block 并非表示 channel 是阻塞/非阻塞的，而是表示是否通过 select: case chan <- data 的方式来访问
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {                                      // select 模式下空 channel 直接返回；否则报错
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) // 向 nil channel 写会直接 gopark，进入阻塞
        throw("unreachable")
    }
    
    if !block && c.closed == 0 && full(c) {            // 快速返回：select 模式下 channel 写满时直接返回 false
        return false
    }
		
  	// 1. 加锁
    lock(&c.lock)

    // 2. 如果等待队列里有等待的读者，直接把数据拷贝到它的栈里；并在下次调度时唤醒它
    if sg := c.recvq.dequeue(); sg != nil {            
        send(c, sg, ep, func() { unlock(&c.lock) }, 3) // send 里面使用了 memmove 将数据复制给接收者
        return true
    }

    // 3. 缓冲区还有空间，写入缓冲区并更新索引
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    // 4.1 缓冲区没有空间了，非阻塞模式（通过 select 方法写）下直接返回
    if !block {
        unlock(&c.lock)
        return false
    }

    // 4.2 同步模式下则会挂起当前协程，放到写等待队列里
    gp := getg()
    mysg := acquireSudog() // 复用或新建一个 sudog
    mysg.releasetime = 0

    mysg.elem = ep         // 保存发送的数据
    mysg.g = gp            
    mysg.c = c
    c.sendq.enqueue(mysg)  // 将 sudog 放入发送者队列
    
    // 陷入休眠
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) 

    // 到这里，说明休眠结束，有读者把数据读走了
    
    KeepAlive(ep) // 里面其实是 println(ep)，用来保证读者读数据前，数据不被 GC 掉

    // 恢复 G 的状态，并将 sudog 放回缓存池
    gp.waiting = nil
    gp.activeStackChans = false
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)  // 将 sudog 放回缓存池，下次复用

    return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep) // 使用了 memmove 将数据复制给接收者
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
  
    // 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 
    // 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行，该处理器在下一次调度时会立刻唤醒数据的接收方
    goready(gp, skip+1)                
}
```

### 读取

Go 语言中可以使用两种不同的方式去接收 Channel 中的数据：

```go
i <- ch
i, ok <- ch
```

这两种不同的方法经过编译器的处理都会变成 `ORECV` 类型的节点，后者会在类型检查阶段被转换成 `OAS2RECV` 类型。最终都会调用 `runtime.chanrecv`。其逻辑如下：

1. 快速返回：先判断 `channel` 是否关闭或者为空，是则直接返回
2. 加锁
3. 尝试从 `sendq` 等待队列中获取等待写的协程
   1. 如果有写等待者
      1. 没有缓冲区：取出 `goroutine` 并读取数据，然后唤醒这个 `goroutine`，结束读取释放锁
      2. 有缓冲区 (有缓冲区的情况下还有等待的 `goroutine`，说明缓冲区此时满了)：从缓冲区队列队头取数据，作为返回的值；再把刚刚 `sendq` 里取出的那个 `goroutine` 放到缓冲队列队尾（保证先进先出）
   2. 如果没有写等待者
      1. 缓冲区有数据：直接读取缓冲区数据
      2. 没有缓冲区或者缓冲区为空，将当前协程加入到 `recvq` 读等待者队列，进入睡眠，等待有数据写入时被唤醒

```go
// 读取的源码和写入其实大同小异
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 1. 判断可否快速返回
    if c == nil {          
        if !block {  // select {case <-chan} 的写法为非阻塞模式，该模式下空 channel 直接返回
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) //否则休眠协程，进入阻塞
        throw("unreachable")
    }
    
    if !block && empty(c) { // 快速返回：select 模式下 channel 为空时直接返回 false
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {       // 还做了 double check
            return true, false
        }
    }
    
    // 2. 加锁
    lock(&c.lock)
    
    // channel 关闭了且缓冲区无数据，返回一个类型默认的零值
    if c.closed != 0 && c.qcount == 0 { 
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep) // 返回 ep 类型默认的零值
        }
        return true, false
    }

    // 3.1 如果有写等待者
    if sg := c.sendq.dequeue(); sg != nil {
        // 如果 sendq 中有等待写的 goroutine，则判断 buffer
        //   如果 buffer 为空，直接从 sender 的栈中读数据
        //   否则从 buffer 头部读数据，将 sender 的数据放入 buffer 队尾
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
    
    // 3.2 没有写等待者
    // 3.2.1 缓冲区里有数据，从缓冲区读取
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        c.recvx++
        // 循环队列，下标满了则重置
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    
    // 3.2.2 缓冲区无数据，将 g 放入 recvq 并阻塞
    mysg := acquireSudog()
    c.recvq.enqueue(mysg)

    // 立刻触发一次调度
   	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
}

// 从 sendq 里读数据
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果 buffer 为空，直接从 sendq 里的 sg 复制元素到 ep
    if c.dataqsiz == 0 {
        if ep != nil {
            recvDirect(c.elemtype, sg, ep)
        }
    // 如果 buffer 不为空，那么 buffer 一定是满的
    } else {
        qp := chanbuf(c, c.recvx)             // 取出 buffer 的头部元素，复制到 ep
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemmove(c.elemtype, qp, sg.elem) // 再把 sg 的值复制回头部那个元素
        c.recvx++                             // 这里很 tricky，没有用弹出头部、再插入队尾的做法
        if c.recvx == c.dataqsiz {            // 而是直接原地更新头部元素值
            c.recvx = 0                       // 由于 buffer 是个环形队列，因此更新队尾偏移量就可以了
        }
        c.sendx = c.recvx                     // 也更新 buffer 的写入偏移量
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)                       // 下次调度唤醒休眠的 sudog
}
```

### close

编译器会将用于关闭管道的 `close` 关键字转换成 `OCLOSE` 节点以及 [`runtime.closechan`](https://draveness.me/golang/tree/runtime.closechan) 函数。

关闭操作将所有排队者唤醒，并设置 `closed`、`param` 字段。

```go
func closechan(c *hchan) {
    // channel 为空指针时 panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
  
    // channel 已关闭时 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    c.closed = 1

    
    // 释放所有接收者
    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg) // 这个参数表名唤醒者是 closechan
        sg.success = false
        glist.push(gp)
    }

    // 释放所有发送者（发送者继续发送会panic）
    for {
        sg := c.sendq.dequeue()
        // 和上面一样
        ...
    }
    unlock(&c.lock)

    // 唤醒所有 G
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3) // 触发调度
    }
}
```

### select

对读写 `channel` 的 `<- chan`、`chan ->` 这种写法，编译器会翻译成对函数的调用。

**写**

对于 `x := <- c` 这类的阻塞操作，会编译为 `chansend1(c, x)`。

对于 `select`，则编译为 `if selectnbsend(c, x) {} else {}` 这种逻辑。

两者底层都调用的 `chansend` 函数，但传的 `block` 参数不同。`chansend1` 传的是 `true`，`select` 传 `false`。

这样在使用 `select` 的写法时，管道才能不阻塞的立即返回 `false`，`case` 才能跳过这个 `false`，无阻塞的继续向下判断 `case2`、`case3`。

```go
// c <- x 
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

// select case c <- 写法
// 编译器会将
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// 编译为
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    return chansend(c, elem, false, getcallerpc())
}
```

**读**

```go
// x:= <- c 
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

// x, ok := <- c
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

// select case <- c 写法
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
    return chanrecv(c, elem, false)
}
```

**select case**

每条 `case` 会被包装成一个 `scase` 对象，里面包含了 `channel`，和数据元素。

```go
//runtime/select.go
type scase struct {
    c    *hchan         // chan
    elem unsafe.Pointer // data element
}
```

`select - case` 则会被编译为对 `selectgo` 函数的调用。里面将所有 `case` 组成了一个 `scase` 数组，以随机顺序遍历这个数组。 如果能读写数据，就返回 否则新建一个 `sudog` 放到 `scase.c` 这个 `channel` 的等待队列里，等待唤醒。 如果所有 `scase` 里都没值，则最后执行 `default`。

如果没有 `default`，则将 `sudog` 放入所有 `scase.c` 的等待队列里

```go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))      // case 数组
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) // 顺序数组

    ncases := nsends + nrecvs                            // case 数量
    scases := cas1[:ncases:ncases]                       // case 数组
    pollorder := order1[:ncases:ncases]                  // 遍历顺序
    lockorder := order1[ncases:][:ncases:ncases]         // 加锁顺序
    
    norder := 0                                          
    for i := range scases {                 // 这个循环用于生成随机排列顺序，后续按这个顺序遍历
        cas := &scases[i]
        j := fastrandn(uint32(norder + 1))  // 居然是蓄水池抽样
        pollorder[norder] = pollorder[j]    // 以 1/i+1 的概率对调 j 和 i+1
        pollorder[j] = uint16(i)
        norder++
    }
    
    // 阶段1： 随机顺序遍历
    for _, casei := range pollorder { 
        casi = int(casei)              // 大无语命名
        cas = &scases[casi]
        c = cas.c

        // 这里会根据不同情况通过 goto 语句跳转到函数内部的不同标签执行相应的逻辑
        if casi >= nsends {
            sg = c.sendq.dequeue()   // 从等待队列里取 sudog
            if sg != nil {           // 取到了，就执行同步读
                goto recv
            }
            if c.qcount > 0 {        // 否则判断 buffer
                goto bufrecv         // 有 buffer 则读 buffer
            }
            if c.closed != 0 {
                goto rclose
            }
        } else {                     // 和上面差不多
            ...
        }
    }

    if !block {                      // select 里有 default 时，block = false，说明为非阻塞，执行这里
        selunlock(scases, lockorder) // 解锁
        casi = -1                    // 返回值为 -1
        goto retc                    // 返回
    }

    // 阶段2：所有 case 都阻塞住了，且没有 default
    // 将 select 挂到所有 case 的等待列表里，等待被唤醒
    gp = getg()
    nextp = &gp.waiting
    for _, casei := range lockorder { 
        casi = int(casei)
        cas = &scases[casi]
        c = cas.c
        sg := acquireSudog()    // 新建 sudog
        sg.g = gp
        sg.isSelect = true
        sg.elem = cas.elem
        sg.c = c
        if casi < nsends {
            c.sendq.enqueue(sg) // 挂到写等待队列
        } else {
            c.recvq.enqueue(sg) // 挂到读等待队列
        }
    }
}
```

### 产生阻塞/panic的情况

以下均不考虑 `select: case <- chan` 的情况

1. 读空的 `channel`
2. 写无缓冲 `channel` / 写有缓冲 `channel` 但数据超过缓冲区
3. 读写 `nil channel`（阻塞。主线程下会导致 `fatal error: all goroutines are asleep`）

| 操作          | nil channel | 正常 channel | 已关闭 channel |
| ----------- | ----------- | ---------- | ----------- |
| `<- ch`     | 阻塞          | 成功 / 阻塞    | 读到零值        |
| `ch <-`     | 阻塞          | 成功 / 阻塞    | `panic`     |
| `close(ch)` | `panic`     | 成功         | `panic`     |

### 样例

读空 `channel`，阻塞

```go
t := make(chan int)
x, ok := <- t //阻塞在这里。主线程被放进t的recvq了，然后主线程被挂起，等待t写入后唤醒主线程
```

读已关闭的 `channel`，正常执行

```go
t := make(chan int)
close(t)
x, ok := <- t //0, false
```

写无缓冲 `channel`，阻塞

```go
t := make(chan int)
t <- 1 //阻塞在这里，必须有其他gorutine消费才可以继续执行
```

**参考**

> [马遥 - 如何理解 Golang 中 "不要通过共享内存来通信，而应该通过通信来共享内存"](https://www.zhihu.com/question/58004055/answer/1972944380)
>
> [Go channel 实现原理分析](https://www.jianshu.com/p/d841f251d3bc)
>
> [Go channel 源码](https://github.com/golang/go/blob/go1.12.7/src/runtime/chan.go#L183)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://wtifs.gitbook.io/diva-notes/go/5.-bing-fa-bian-cheng/4.channel-de-shi-xian.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
