前言
如果想兼顾开发效率,又能保证高并发,协程就是最好的选择。它可以在保持异步化运行机制的同时,用同步方式写代码(goroutine-per-connection),这在实现高并发的同时,缩短了开发周期,是高性能服务未来的发展方向。
- CPU 和 IO 设备是不同的设备,能并行运行。合理调度程序,充分利用硬件,就能跑出很好的性能;
- Go 的 IO 最最核心的是 io 库,除了定义 interface (Reader/Writer),还实现了通用的函数,比如 Copy 之类的;
- 内存字节数组可以作为 Reader ,Writer ,实现在 bytes 库中,字符串可以作为 Reader,实现在 strings 库中,strings.NewReader;网络句柄可以作为 Reader ,Writer ,实现在 net 库中,net.Conn;文件句柄可以作为 Reader ,Writer ,实现在 os 库中,os.File ;
整体理念
Go语言TCP Socket编程从tcp socket诞生后,网络编程架构模型也几经演化,大致是:“每进程一个连接” –> “每线程一个连接” –> “Non-Block + I/O多路复用(linux epoll/windows iocp/freebsd darwin kqueue/solaris Event Port)”。伴随着模型的演化,服务程序愈加强大,可以支持更多的连接,获得更好的处理性能。不过I/O多路复用也给使用者带来了不小的复杂度,以至于后续出现了许多高性能的I/O多路复用框架, 比如libevent、libev、libuv等,以帮助开发者简化开发复杂性,降低心智负担。不过Go的设计者似乎认为I/O多路复用的这种通过回调机制割裂控制流的方式依旧复杂,且有悖于“一般逻辑”设计,为此Go语言将该“复杂性”隐藏在Runtime中了:Go开发者无需关注socket是否是 non-block的,也无需亲自注册文件描述符的回调,只需在每个连接对应的goroutine中以“block I/O”的方式对待socket处理即可。
The Go netpollerIn Go, all I/O is blocking. The Go ecosystem is built around the idea that you write against a blocking interface and then handle concurrency through goroutines and channels rather than callbacks and futures.An example is the HTTP server in the “net/http” package. Whenever it accepts a connection, it will create a new goroutine to handle all the requests that will happen on that connection. This construct means that the request handler can be written in a very straightforward manner. First do this, then do that. Unfortunately, using the blocking I/O provided by the operating system isn’t suitable for constructing our own blocking I/O interface.
netty 在屏蔽java nio底层细节方面做得不错, 但因为java/jvm的限制,“回调机制割裂控制流”的问题依然无法避免。
原理
- 多路复用 有赖于 linux 的epoll 机制,具体的说 是 epoll_create/epoll_ctl/epoll_wait 三个函数,所谓编程语言多路复用,就是封装和触发epoll_xx函数。
- epoll 机制包含 两个fd: epfd 和 待读写数据的fd(比如socket)。先创建efpd(对应一个红黑树),然后向epfd 注册fd事件(key=fd,value=pollDesc,还附带一个事件类型), 之后触发epoll_wait 轮询注册在epfd 的fd 事件发生了没有。
- netpoller 负责将 操作系统 提供的nio 转换为 goroutine 支持的blocking io。为屏蔽linux、windows 等底层nio 接口的差异,netpoller 定义一个 虚拟接口来封装底层接口。
func poll_init() // 底层调用 epoll_create 指令,完成epoll 事件表的初始化 func poll_open(...) // 构造与 fd 对应的 pollDesc实例,其中含有事件状态标识器 rg/wg,用于标识事件状态以及存储因poll_wait 而阻塞的 goroutine实例;接下来通过 epoll_ctl(ADD)操作,将 fd(key) 与 pollDesc(value) 注册到 epoll事件表中 func poll_close(...) // 执行 epoll_ctl(DEL)操作,将 pollDesc 对应 fd 从 epoll 事件表中移除 func poll_wait(...) // 当 g 依赖的某io 事件未就绪时,会通过 gopark 操作,将 g 置为阻塞态,并将 g 实例存放在 pollDesc 的事件状态标识器 rg/wg 中 func net_poll() // gmp 调度流程会轮询驱动 netpoll 流程,通常以非阻塞模式发起 epoll_wait 指令,取出所有就绪的 pollDesc,通过事件标识器取得此前因 gopark 操作而陷入阻塞态的 g,返回给上游用于唤醒和调度(在 gc和 sysmon 也触发 netpoll 流程)
本文 主要讲 netpoller 基于 linux 的epoll 接口 的实现 Go netpoller 网络模型之源码全面解析
I/O 多路复用需要使用特定的系统调用(epoll_create/epoll_ctl/epoll_wait),socket/fd.create 触发epoll_ctl(创建fd,bind fd 和port,将fd挂到epoll 红黑树上),java 和 python 必须单独提一个eventloop 组件触发epoll_create/epoll_wait,go 提了一个netpoller 封装epoll_create/epoll_wait, 在runtime里出发epoll_wait
- Goroutine 视角,Goroutine 让出线程并等待读写事件:当我们在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写,当前 Goroutine 就会执行
runtime.poll_runtime_pollWait
检查runtime.pollDesc
的状态并调用runtime.netpollblock
等待文件描述符的可读或者可写。runtime.netpollblock
会使用运行时提供的runtime.gopark
让出当前线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒。 - netpoller 视角,多路复用等待读写事件的发生并返回:netpoller并不是由runtime中的某一个线程独立运行的,
- runtime中的调度和系统调用会通过 runtime.netpoll 与网络轮询器交换消息,在找到就绪的 pollDesc 后,进一步获取待执行的 Goroutine 列表,恢复(goready)Goroutine 为运行状态,并将待执行的 Goroutine 加入运行队列等待处理。PS:netpoller 也是gmp中可用可运行g的一个来源。
实现
服务端逻辑
在c、java等传统编程语言中,listen所做的事情就是直接调用内核的listen systemcall,在go net.Listen里干的活儿就多了,封装度非常高,更大程度的对程序员屏蔽了底层实现的细节在 golang 中是如何对 epoll 进行封装的? PS:epoll_xx操作隐藏在常规的阻塞io socket操作(listen/accept)内
- 在 golang net 的 listen 中,会完成如下几件事:
- 创建 socket 并设置非阻塞
- bind 绑定并监听本地的一个端口
- 调用 listen 开始监听
- epoll_create 创建一个 epoll 对象
- epoll_etl 将 listen 的 socket 添加到 epoll 中等待连接到来
- Accept主要做了三件事
- 调用 accept 系统调用接收一个连接
- 如果没有连接到达,把当前协程阻塞掉
- 新连接到来的话,将其添加到 epoll 中管理,然后返回
- Read 调用read systemcall 来读取数据,如果数据尚未到达,则把自己阻塞起来。
- Write 的大体过程和 Read 是类似的。先是调用 Write 系统调用发送数据,如果内核发送缓存区不足的时候,就把自己先阻塞起来,然后等可写事件发生的时候再继续发送。
当要等待的事件就绪的时候,被阻塞掉的协程又是如何被重新调度的呢?
- Go 语言的运行时会在调度或者系统监控中调用sysmon,它会调用 netpoll,来不断地调用 epoll_wait 来查看 epoll 对象所管理的文件描述符中哪一个有事件就绪需要被处理了。如果有,就唤醒对应的协程来进行执行。
- 除此之外还有几个地方会唤醒协程,如
- startTheWorldWithSema
- findrunnable 在 schedule 中调用 有top 和 stop 之分。其中 stop 中会导致阻塞。
- pollWork
核心数据结构
connect/accept/read/write 都会 转换为 pollDesc 操作。
调用 internal/poll.pollDesc.init
初始化文件描述符时不止会初始化网络轮询器,会通过 runtime.poll_runtime_pollOpen
函数重置轮询信息 runtime.pollDesc
并调用 runtime.netpollopen
初始化轮询事件。runtime.netpollopen
会调用 epollctl 向全局的轮询文件描述符 epfd 中加入新的轮询事件监听文件描述符的可读和可写状态
轮询 以获取 可执行的Goroutine
这里类似 netty 的eventloop
// src/runtime/netpoll_epoll.go
func netpoll(delay int64) gList {
// 根据传入的 delay 计算 epoll 系统调用需要等待的时间;
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
var events [128]epollevent
retry:
// 调用 epollwait 等待可读或者可写事件的发生;
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if waitms > 0 {
return gList{}
}
goto retry
}
// 在循环中依次处理 epollevent 事件;
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
...
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
...
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
netpollready(&toRun, pd, mode)
}
}
return toRun
计算了需要等待的时间之后,runtime.netpoll 会执行 epollwait 等待文件描述符转换成可读或者可写。当 epollwait 函数返回的值大于 0 时,就意味着被监控的文件描述符出现了待处理的事件。处理的事件总共包含两种,一种是调用 runtime.netpollBreak
函数触发的事件,该函数的作用是中断网络轮询器;另一种是其他文件描述符的正常读写事件,对于这些事件,我们会交给 runtime.netpollready
处理
代码实现
//$GOROOT/src/net/tcpsock.go
type TCPConn struct {
conn
}
//$GOROOT/src/net/net.go
type conn struct {
fd *netFD
}
// $GOROOT/src/net/fd_unix.go
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// $GOROOT/src/internal/poll/fd_unix.go
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
... ...
}
net.conn只是*netFD 的外层包裹结构,最终 Write 和 Read 都会落在其中的fd字段上,netFD 在不同平台上有着不同的实现。
// $GOROOT/src/internal/poll/fd_unix.go
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
func (fd *FD) Write(p []byte) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
var nn int
for {
max := len(p)
if fd.IsStream && max-nn > maxRW {
max = nn + maxRW
}
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}
sysmon 唤醒协程
// file: src/runtime/proc.go
func sysmon() {
...
list := netpoll(0) // 会不断触发对netpoll 的调用,netpoll 会调用epollwait查看是否有网络事件发生
}
func netpoll(delay int64) gList {
...
retry:
n := epollwait(epfd, &events[0], int3(len(events)), waitms)
if n < 0 {
// 没有网络事件
goto retry
}
for i := int32(0); i < n; i++ {
// 查看是读事件还是写事件发生
var mode int32
if ev.events&(_EPOLLIN | _EPOLLRDHUP | _EPOLLHUP | _EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT | _EPOLLRLHUP | _EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// ev.data 就是就绪的socket fd,根据fd 拿到pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events&( _EPOLLHUP | _EPOLLERR) != 0 {
pd.everr = true
}
netpollready(&toRun, pd, mode) // 将对应的g 推入可运行队列等待调度执行
}
}
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
io 前后的GPM
G1 正在 M 上执行,还有 3 个 Goroutine 在 LRQ 上等待执行。网络轮询器空闲着,什么都没干。
G1 想要进行网络系统调用,因此它被移动到网络轮询器并且处理异步网络系统调用。然后,M 可以从LRQ 执行另外的 Goroutine。此时,G2 就被上下文切换到 M 上了。
异步网络系统调用由网络轮询器完成,G1 被移回到 P 的 LRQ 中。一旦 G1 可以在 M 上进行上下文切换,它负责的 Go 相关代码就可以再次执行。
执行网络系统调用不需要额外的 M。网络轮询器使用系统线程,它时刻处理一个有效的事件循环/eventloop。