diva-notes
  • README
  • Ads
    • 定价策略
    • 广告层级
    • 归因模型
    • 买量
    • Chat GPT
    • Google
  • AI
    • 参考资料
    • Chat GPT
    • stable-diffusion-webui安装
  • Algorithm
    • 倍增
    • 并查集
    • 参考
    • 环的判断
    • 凸包
    • 蓄水池抽样
    • 最短路径
    • 最小生成树
    • KMP算法
    • Rabin-Karp算法
    • Tarjan桥算法
  • Architecture
    • Serverless
  • Career
  • CICD
    • 代码质量
    • CICD实践
  • Data Structure
    • 布谷鸟过滤器
    • 布隆过滤器
    • 浮点
    • 红黑树
    • 锁
    • LSM树
  • DB
    • My SQL
      • 隔离级别
      • 架构
      • 索引
      • 锁
      • 页结构
      • 主从同步
      • ACID
      • Log
      • MVCC
      • Questions
    • Postgres
      • 持久化
      • 对比MySQL
      • 隔离级别
      • 索引
      • Greenpulm
      • MVCC
    • 倒排索引
    • 列式存储
    • H Base
    • HDFS
    • MPP数据库选型
    • Questions
  • Distributed System
    • 分布式事务
    • 服务网格
    • BASE理论
    • CAP
    • Etcd
    • Raft协议
    • ZAB协议
  • Go
    • 1.语言基础
      • 1.CPU寄存器
      • 2-1.函数调用
      • 2-2.函数调用栈
      • 2.接口
      • 3.汇编
      • 4.调试
    • 2.编译
      • 1.编译
      • 2.词法与语法分析
      • 3.类型检查
      • 4.中间代码生成
      • 5.机器码生成
    • 3.数据结构
      • 1.数组array
      • 2.切片slice
      • 3.哈希表map
      • 4.字符串
    • 4.常用关键字
      • 1.循环
      • 2.defer
      • 3.panic和recover
      • 4.make和new
    • 5.并发编程
      • 1.上下文Context的实现
      • 2-1.runtime.sema信号量
      • 2-2.sync.Mutex的实现
      • 2-3.sync.WaitGroup
      • 2-4.sync.Once的实现
      • 2-5.sync.Map的实现
      • 2-6.sync.Cond
      • 2-7.sync.Pool的实现
      • 2-8.sync.Semaphore的实现
      • 2-9.sync.ErrGroup
      • 3.定时器Timer的实现
      • 4.Channel的实现
      • 5-1.调度-线程
      • 5-2.调度-MPG
      • 5-3.调度-程序及调度启动
      • 5-4.调度-调度策略
      • 5-5.调度-抢占
      • 6.netpoll实现
      • 7.atomic
    • 6.内存管理
      • 1-1.内存分配基础-TCmalloc
      • 1-2.内存分配
      • 2.垃圾回收
      • 3.栈内存管理
    • 参考
    • 各版本特性
    • 坑
    • Go程序性能优化
    • http.Client
    • net.http路由
    • profile采样的实现
    • Questions
    • time的设计
  • Kafka
    • 高可用
    • 架构
    • 消息队列选型
    • ISR
    • Questions
  • Network
    • ARP
    • DNS
    • DPVS
    • GET和POST
    • HTTP 2
    • HTTP 3
    • HTTPS
    • LVS的转发模式
    • NAT
    • Nginx
    • OSI七层模型
    • Protobuf
    • Questions
    • REST Ful
    • RPC
    • socket缓冲区
    • socket详解
    • TCP滑动窗口
    • TCP连接建立源码
    • TCP连接四元组
    • TCP三次握手
    • TCP数据结构
    • TCP四次挥手
    • TCP拥塞控制
    • TCP重传机制
    • UDP
  • OS
    • 磁盘IO
    • 调度
    • 进程VS线程
    • 零拷贝
    • 内存-虚拟内存
    • 内存分配
    • 用户态VS内核态
    • 中断
    • COW写时复制
    • IO多路复用
    • Questions
  • Redis
    • 安装
    • 参考
    • 高可用-持久化
    • 高可用-主从同步
    • 高可用-Cluster
    • 高可用-Sentinel
    • 缓存一致性
    • 事务
    • 数据结构-SDS
    • 数据结构-Skiplist
    • 数据结构-Ziplist
    • 数据结构
    • 数据类型-Hashtable
    • 数据类型-List
    • 数据类型-Set
    • 数据类型-Zset
    • 数据淘汰机制
    • 通信协议-RESP
    • Questions
    • Redis6.0多线程
    • Redis分布式锁
    • Redis分片
  • System Design
    • 本地缓存
    • 错误处理
    • 大文件处理
    • 点赞收藏关注
    • 短链接生成系统
    • 负载均衡
    • 高并发高可用
    • 规则引擎
    • 集卡活动
    • 秒杀系统
    • 评论系统
    • 熔断
    • 限流
    • 延迟队列
    • Docker
    • ES
    • K 8 S
    • Node.js
    • Questions
  • Work
    • Bash
    • Charles
    • Code Review
    • Ffmpeg
    • Git
    • intellij插件
    • I Term 2
    • Mac
    • mysql命令
    • Nginx
    • postgresql命令
    • Protoc
    • Ssh
    • Systemd
    • Tcp相关命令
    • Vim
Powered by GitBook
On this page
  • 客户端
  • 服务端
  • net.Listen() 的实现
  • listen.Accept() 的实现
  • netpoll 的调度
  • 总结
  1. Go
  2. 5.并发编程

6.netpoll实现

Previous5-5.调度-抢占Next7.atomic

Last updated 2 years ago

Go语言值得称道的一点就是用很简单的几行代码就可以写出支持高并发的服务器程序。那么 Go语言里的 TCP 连接是怎样建立的呢?

这里分客户端和服务端两部分看

客户端

客户端一般通过如下代码发送请求:

func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:3000")
    conn.Write([]byte(data)) // 向服务端发送数据
    n,err := conn.Read(buf)  //读取服务端端数据
}

net.Dial() 底层实际调用的是系统函数 socket() 和 connet() 创建 socket连接,write() 和 read() 读写数据。

socket (套接字) 是 Unix系统下抽象出来的一层概念,与 Unix系统 file descriptor (文件描述符) 相整合,使得网络读写数据和本地文件一样容易。一般记录五元组(协议 + 双方地址 + 双方端口)

这里不详细解析,主要看服务端下的实现

服务端

服务端一般通过如下代码接收客户端请求:

func main()  {
    listen, err := net.Listen("tcp",":8080") // 创建监听 socket
    for {
        conn, errs := listen.Accept()       // 接收客户端连接
        go handle(conn) 					          // 一个 goroutine 处理一个连接
    }
}

net.Listen() 的实现

net.Listen() 经过层层调用,最底层实际调用的是如下系统函数:

  1. socket() 创建 socket fd

  2. bind() 绑定 socket 与监听地址

  3. listen() 监听 socket

    1. epollcreate() 创建 epoll 对象

    2. epollctl() 将监听 socket fd 加入到 epoll 红黑树里进行监听

// net/dial.go
func Listen(network, address string) (Listener, error) {
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}

// lc.Listen()
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    sl := &sysListener{}
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)
    case *UnixAddr:
        l, err = sl.listenUnix(ctx, la)
    }
}
// net/tcpsock_posix
// sysListener.listenTCP()
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
}
// net/ipsock_posix.go
func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

重点看这个 net/sock_posix.go/socket() 函数

// net/sock_posix.go
// socket()
// laddr: local address, radd: remote address
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    s, err := sysSocket(family, sotype, proto) // 调用 syscall.Socket() 创建系统 socket 对象
    fd, err = newFD(s, family, sotype, net)    // new 一个 fd 结构体
    if laddr != nil && raddr == nil {          // 远端地址为空,表示监听。服务端进入这个 if 分支
        switch sotype {
            case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
                fd.listenStream(laddr, listenerBacklog(), ctrlFn) // 绑定端口并监听
        }
    }
}

// listenStream()
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    syscall.Bind(fd.pfd.Sysfd, lsa)    // syscall.Bind() 将监听地址绑定到 socket 上
    listenFunc(fd.pfd.Sysfd, backlog)  // syscall.Listen() 监听,backlog 参数控制连接队列长度,取自系统参数 /proc/sys/net/core/somaxconn
    fd.init()                          // epoll 对象初始化和跟踪
}

fd.init() 里主要是对 epoll 对象的创建和跟踪,实现如下:

// net/fd_unix.go
// fd.init()
func (fd *netFD) init() error {
    return fd.pfd.Init(fd.net, true)
}

func (fd *FD) Init(net string, pollable bool) error {
    err := fd.pd.init(fd)
}
// poll/fd_poll_runtime.go
func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit) 
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
}

runtime_pollServerInit() 函数通过 go:linkname 注释由 runtime/netpoll.go/poll_runtime_pollServerInit() 实现,底层调用 epoll_create1() 函数,创建 epoll 对象,并通过 atomic 包保证只创建一次

// runtime/netpoll.go
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 { // 用一个标记变量保证只创建一次
        netpollinit() // 根据操作系统有不同的实现,linux 下是 epoll,MacOS 下是 kqueue
    }
}

netpollinit() 函数在 linux 系统下的实现文件为 runtime/netpoll_epoll.go

// runtime/netpoll_epoll.go
func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC) // 创建 epoll 对象;使用汇编实现,实际调用 linux epoll_create1() 函数
}

runtime_pollOpen() 函数由 runtime/netpoll.go/net_runtime_pollOpen() 实现,底层调用 epoll_ctl() 函数,将 socket fd 放入 epoll 对象中监听,以便在和客户端的连接建立时得到通知

// runtime/netpoll.go
// go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    errno := netpollopen(fd, pd)
}

这里可以看到通过常量 _EPOLLET 将 epoll 设置为了边缘触发 (Edge Triggered) 模式,并将 fd 通过 epollctr 放入 epoll 对象中管理。

// runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET // EPOLLET -> Edge Triggered 边缘触发模式
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) // 实际调用 linux epoll_ctl()
}

listen.Accept() 的实现

listen.Accept() 的逻辑主要是:

  1. 轮询调用系统函数 accept() 等待并接收连接

  2. 连接到来后,调用 epollcreate() 和 epollctl() (和上面一样)管理 socket

// net/tcpsock.go
func (l *TCPListener) Accept() (Conn, error) {
    c, err := l.accept() // accept() 函数会阻塞式的等待下一个连接
}

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
}
// net/fd_unix.go
func (fd *netFD) accept() (netfd *netFD, err error) {
    d, rsa, errcall, err := fd.pfd.Accept() // 等待连接

    netfd, err = newFD(d, fd.family, fd.sotype, fd.net) // 为连接新建 fd
    netfd.init() // 这个和前面的 init() 一样,创建和维护 epoll 对象
}

下面这里是个 for循环,轮询调用 accept 函数。因为我们在 Listen 的时候已经把对应的 Listener fd 设置成非阻塞I/O了,所以调用accept 这一步是不会阻塞的。只是下面会进行判断,根据判断 err ==syscall.EAGAIN 来调用fd.pd.waitRead阻塞住用户程序。

// poll/fx_unix.go
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    for {
        s, rsa, errcall, err := accept(fd.Sysfd) // 系统调用 accept()
        switch err {
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil { // 没有获取到连接,则阻塞协程,底层调用 gopark(),只阻塞当前 G,不影响 M
                    continue
                }
            }
        }
    }
}

netpoll 的调度

前面可以看到,在 Listen() 和 Accept() 里创建并维护了 epoll 对象,那么什么时候会调用 epoll_wait() 获取就绪的 socket 呢?

Go 里通过 runtime.netpoll() 来获取就绪的 socket,这个函数调用的地方主要有两处,在 Go 的调度函数里:

  1. 触发调度的函数 runtime.shcedule() -> runtime.findrunable() 中调用了 runtime.netpoll() 获取待执行的协程

  2. sysmon 监控协程 每次运行会检查距离上一次执行 netpoll() 函数是否超过10ms,如果是则会调用一次 runtime.netpoll()

// runtime/proc.go
func findrunnable() (gp *g, inheritTime bool) {
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        list := netpoll(0)
    }
}

func sysmon() {
	for {
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            list := netpoll(0) 
        }
    }
}

netpoll() 解析

netpoll() 里调用了 epollwait() 系统调用函数。如果返回的值大于 0,意味着被监控的文件描述符出现了待处理的事件,将这些协程放入 toRun 列表返回给上层调度函数处理。

// runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
    // 这里进行了系统调用 epoll_wait(),将就绪的 fd 写入 events 数组,返回 fd 数量
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)

    var toRun gList
    for i := int32(0); i < n; i++ { // 遍历就绪的 fd,放入 toRun 数组并返回给上层函数调度
        ev := &events[i]
		    pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
		
        netpollready(&toRun, pd, mode)
    }
}
// runtime/netpoll.go
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    toRun.push(wg)
}

总结

netpoll 底层就是对I/O多路复用的封装,是 I/O多路复用 + Go调度器 二者的结合。

不同平台对I/O多路复用有不同的实现方式,Go 在不同平台上 netpoll 调用的底层实现也不一样。比如Linux 下使用 epoll,macOS 则是 kqueue,而 Windows 是基于异步I/O实现的 ICOP。编译器在编译 Go 语言程序时,会根据目标平台选择树中特定的分支进行编译。

优点

  1. 每个 goroutine 监听一个 TCP连接,轻量且支持海量

    1. 当连接上没有数据到达时,goroutine 会被 gopark() 函数阻塞。该阻塞不会陷入内核态,也不阻塞 M,M 可以寻找别的 G 执行,切换 G的开销极小

    2. 有数据到达时,再通过运行时调度处理连接

  2. 底层调用 epoll IO多路复用机制,较为高效

不足

  1. 海量连接场景下,goroutine 及内存使用会暴涨,且当前的垃圾回收机制下不会随连接销毁释放

  2. 所有连接维护在一个 epoll 对象里,高频创建和释放连接情况下可能导致性能瓶颈

img