前言
net 库要考虑的几个问题
- io 模型,一般是nio/epoll。ET 还是LT 触发?
 - 线程模型,一个con 一个goroutine 还是其它?
 - 内存管理/ buffer 设计
 
业务层一般处理逻辑
tcp 代码示例
func handleConn(c net.Conn) {
    defer c.Close()
    for {
        // read from the connection
        // ... ...
        // write to the connection
        //... ...
    }
}
func main() {
    l, err := net.Listen("tcp", ":8888")
    if err != nil {
        fmt.Println("listen error:", err)
        return
    }
    for {
        c, err := l.Accept()
        if err != nil {
            fmt.Println("accept error:", err)
            break
        }
        // start a new goroutine to handle
        // the new connection.
        go handleConn(c)
    }
}
TCP 连接上的数据是一个没有边界的字节流,但在业务层眼中,没有字节流,只有各种协议消息。因此,无论是从客户端到服务端,还是从服务端到客户端,业务层在连接上看到的都应该是一个挨着一个的协议消息流。
对应到 代码上就是对 handleConn 进一步抽象/逻辑拆分,将业务逻辑转化到 handlePacket 上。handleConn 和 handlePacket 可以进一步拆分为 “粘包”(Frame) 和 “序列化”(Packet)两个部分
// handleConn的调用结构
read frame from conn
    ->frame decode
      -> handle packet
        -> packet decode
        -> packet(ack) encode
    ->frame(ack) encode
write ack frame to conn
这个层次就很清晰了,复杂逻辑/需求逐层分解,也为分析类似 代码提供了分析的切入点(结构化思维)不会被带到细节里面去。
type Packet interface {
    Decode(io.Reader)(Packet,error)    
    Encode(io.Writer,Packet) error 
}
func handleConn(c net.Conn) {
    defer c.Close()
	packet = new ...
    for {
        // read from the connection
        packet, err := Decode(c)
		ackPacket, err := handlePacket(packet)
        // write to the connection
        err := Encode(c,ackPacket)
    }
}
各个场景下的代码示例
http 代码示例
func helloHandler(w http.ResponseWriter, req *http.Request) {
    io.WriteString(w, "hello, world!\n")
}
func main() {
    http.HandleFunc("/", helloHandler)
    http.ListenAndServe(":12345", nil)
}
两个大循环。PS:有点netty bossGroup 和workGroup的感觉了。
- 处理连接:http.ListenAndServe ==> server.ListenAndServe ==> rw := l.Accept(); c := srv.newConn(rw) ==> go c.serve(connCtx)。 一个 for 循环里面在不停地 Accept,这个 Accept 在没有连接时是阻塞的,当有连接时,起一个新的协程来处理。
 - 处理请求:conn.serve ==> c.readRequest; DefaultServeMux.ServeHTTP; w.finishRequest() 。 也是一个for循环,循环里面主要是读取一个请求,然后将请求交给 Handler 处理。因为每个 serve 处理的是一个连接,一个连接可以有多次请求。
 
// src/net/http/server.go
// 封装 bind、listen、accept
func ListenAndServe(addr string, handler Handler) error {
  server := &Server{Addr: addr, Handler: handler}
  return server.ListenAndServe()
}
func (srv *Server) ListenAndServe() error {
  if srv.shuttingDown() {
    return ErrServerClosed
  }
  addr := srv.Addr
  if addr == "" {
    addr = ":http"
  }
  ln, err := net.Listen("tcp", addr)
  return srv.Serve(ln)
}
func (srv *Server) Serve(l net.Listener) error {
  origListener := l
  l = &onceCloseListener{Listener: l}
  defer l.Close()
  if err := srv.setupHTTP2_Serve(); err != nil {
    return err
  }
  ctx := context.WithValue(baseCtx, ServerContextKey, srv)
  for {
    rw, err := l.Accept()                 // 接收客户端请求
    c := srv.newConn(rw)                  // 创建一个新的连接
    go c.serve(connCtx)                   // 起一个goroutine处理客户端请求
  }
}
// src/net/http/server.go
func (c *conn) serve(ctx context.Context) {
  // HTTP/1.x from here on.
  c.r = &connReader{conn: c}
  c.bufr = newBufioReader(c.r)
  c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
  for {
    // 读取一个请求,然后将请求交给 Handler 处理
    w, err := c.readRequest(ctx) // 从连接中获取HTTP请求并构建一个 http.response
    serverHandler{c.server}.ServeHTTP(w, w.req) // 处理请求
    w.finishRequest()
  }
}
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
  handler := sh.srv.Handler
  if handler == nil {
    handler = DefaultServeMux
  }
  handler.ServeHTTP(rw, req)
}
请求如何路由?注册处理器
- HandleFunc 将 
<pattern,handler>注册到 Mutex 中 - ServeHTTP 负责 从Mutex 拿到url 匹配的handler 并调用执行
 
使用map进行存储,本质是一个静态索引,同时维护了一个切片,用来做前缀匹配,只要以/结尾的,都会在切片中存储。
// src/net/http/server.go
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
  DefaultServeMux.HandleFunc(pattern, handler)
}
type ServeMux struct {
  mu    sync.RWMutex
  m     map[string]muxEntry
  es    []muxEntry // slice of entries sorted from longest to shortest.
  hosts bool       // whether any patterns contain hostnames
}
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
  if handler == nil {
    panic("http: nil handler")
  }
  mux.Handle(pattern, HandlerFunc(handler))
}
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
  if r.RequestURI == "*" {
    if r.ProtoAtLeast(1, 1) {
      w.Header().Set("Connection", "close")
    }
    w.WriteHeader(StatusBadRequest)
    return
  }
  h, _ := mux.Handler(r)  // 进行路由匹配,获取注册的处理函数
  h.ServeHTTP(w, r)       // 这块就是执行我们注册的handler
}
服务端监听端口本质也是使用net网络库进行TCP连接,然后监听对应的TCP连接,每一个HTTP请求都会开一个goroutine去处理请求。所以如果有海量请求,会在一瞬间创建大量的goroutine,这个可能是一个性能瓶颈点。

这在java 里就是一整个tomcat干的活儿了。
grpc 代码示例
demo helloworld helloworld.proto helloworld.pb.go ## 基于helloworld.proto 生成 server main.go client main.go
服务端main.go 示例
package main
const (
    port = ":50051"
)
// server is used to implement helloworld.GreeterServer.
type server struct{}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    log.Printf("Received: %v", in.Name)
    return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    helloworld.RegisterGreeterServer(s, &server{})
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}
helloworld.pb.go 中定义了RegisterGreeterServer 方法,除传入grpc.Server外,第二个参数是定义好的 GreeterServer interface。 由此可见,grpc 与java thrift 异曲同工
- 定义thrift 文件
 - thrift 命令基于thrift 文件生成 对应语言的 代码文件,包含了服务 接口
 - 开发者提供 接口实现类
 
上层封装——以getty 为例
Go 语言网络库 getty 的那些事 对go netpoller 的信心:Getty 只考虑使用 Go 语言原生的网络接口,如果遇到网络性能瓶颈也只会在自身层面寻找优化突破点。
go 通过netpoller 可以在保持异步化运行机制的同时,用同步方式写代码(goroutine-per-connection)。但毕竟代码拿到的还 是字节流,对于上层业务处理 编码工作量仍然非常大,于是 就像netty 之于java 一样 国内大佬 开发了 getty。Getty 严格遵循着分层设计的原则,主要分为数据交互层、业务控制层、网络层,同时还提供非常易于扩展的监控接口。 无论是基于getty 开发上层服务 还是 学习getty 之后再去 学习类似mosn/dubbo-go 等 都会大大降低 心智负担。
Go 语言网络库 getty 的那些事 后续性能优化部分未细读
分层
| 分层 | 接口 | 用户需要做的工作 | 
|---|---|---|
| EventListener/业务处理 | onOpen/onError/onClose/onMessage/onCron | 在onMessage 中写入业务逻辑 | 
| 数据交互层/编解码 | ReadWriter | 机型byte[] 和 Message 的转换 | 
| 业务控制层 | Connection/Session | |
| 网络层 | Socket | 
In getty there are two goroutines in one connection(session), one reads tcp stream/udp packet/websocket package, the other handles logic process and writes response into network write buffer. If your logic process may take a long time, you should start a new logic process goroutine by yourself in codec.go:(Codec)OnMessage.

// Reader is used to unmarshal a complete pkg from buffer
type Reader interface {
	// Read Parse tcp/udp/websocket pkg from buffer and if possible return a complete pkg.
	// When receiving a tcp network streaming segment, there are 4 cases as following:
	// case 1: a error found in the streaming segment;
	// case 2: can not unmarshal a pkg header from the streaming segment;
	// case 3: unmarshal a pkg header but can not unmarshal a pkg from the streaming segment;
	// case 4: just unmarshal a pkg from the streaming segment;
	// case 5: unmarshal more than one pkg from the streaming segment;
	//
	// The return value is (nil, 0, error) as case 1.
	// The return value is (nil, 0, nil) as case 2.
	// The return value is (nil, pkgLen, nil) as case 3.
	// The return value is (pkg, pkgLen, nil) as case 4.
	// The handleTcpPackage may invoke func Read many times as case 5.
	Read(Session, []byte) (interface{}, int, error)
}
// Writer is used to marshal pkg and write to session
type Writer interface {
	// Write if @Session is udpGettySession, the second parameter is UDPContext.
	Write(Session, interface{}) ([]byte, error)
}
// ReadWriter interface use for handle application packages
type ReadWriter interface {
	Reader
	Writer
}
见过的最好的协议层定义:ReadWriter 接口定义代码如上。Read 接口之所以有三个返回值,是为了处理 TCP 流粘包情况:
- 如果发生了网络流错误,如协议格式错误,返回 (nil, 0, error)
 - 如果读到的流很短,其头部 (header) 都无法解析出来,则返回 (nil, 0, nil)
 - 如果读到的流很短,可以解析出其头部 (header) 但无法解析出整个包 (package),则返回 (nil, pkgLen, nil)
 - 如果能够解析出一个完整的包 (package),则返回 (pkg, 0, error)
 
session 负责客户端的一次连接建立的管理,会持有buffer、channel 等用于 工作流中 数据暂存、协程间沟通. session 的listen、accept方法返回值 都赋值给session 自己的成员,“自产自销”。
- 向下 Session 对 Go 内置的网络库做了完善的封装,包括对 net.Conn 的数据流读写、超时机制等。
 - 向上,Session 提供了业务可切入的接口,用户只需实现 EventListener 就可以将 Getty 接入到自己的业务逻辑中。
 
Connection 根据不同的通信模式对 Go 内置网络库进行了抽象封装,Connection 分别有三种实现gettyTCPConn/gettyUDPConn/gettyWSConn。 PS Session Interface 和 session struct 都聚合了Connection,屏蔽不同的传输层差异, 可能是Connection 和 Session 拆开的原因。
启动流程
在 Getty 中,server 服务的启动流程
server := getty.NewTCPServer(options...)
server.RunEventLoop(newSession NewSessionCallback)
// type NewSessionCallback func(Session) error
// 用户需要通过该函数,为 session 设置好要用的 Reader、Writer 以及 EventListener。
func newSession(session getty.Session) error {
    ...
    session.SetPkgHandler(echoPkgHandler)       // 在 echoPkgHandler 中实现编解码
	session.SetEventListener(echoMsgHandler)    // 在onMessage 中实现业务逻辑
    ession.SetReadTimeout(conf.GettySessionParam.tcpReadTimeout)
	session.SetWriteTimeout(conf.GettySessionParam.tcpWriteTimeout)
    ...
}

数据读取
RunEventLoop.RunEventLoop 收到新的连接 session 即执行 session.run
- 处理byte[]: session.handlePackage ==> session.handleTCPPackage
 - 处理协议消息: reader.Read 得到pkg ==> session.addTask ==> session.listener.OnMessage
 
// github.com/AlexStocks/getty/transport/server.go
func (s *server) RunEventLoop(newSession NewSessionCallback) {
    err := s.listen()	  			// ==> server.listenTCP ==> server.streamListener = net.Listen("tcp", server.addr) 开启监听
	switch s.endPointType {
	case TCP_SERVER:
		s.runTcpEventLoop(newSession)
    ...
	}
}
func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
	s.wg.Add(1)
	go func() {
		defer s.wg.Done()
		for {
			client, err = s.accept(newSession) // conn = server.streamListener.Accept(); session := newTCPSession(conn, server)
			client.(*session).run()
		}
	}()
}
// github.com/AlexStocks/getty/transport/session.go
func (s *session) run() {
    err := s.listener.OnOpen(s)
	go s.handleLoop()           // 发送网络字节流、调用 EventListener.OnCron() 执行定时逻辑
	go s.handlePackage()        // 读取字节数据,转为message,移交给   Goroutine Pool 处理业务逻辑
}
读取数据
func (s *session) handlePackage() {
    ...
	err = s.handleTCPPackage()
    ...
}
func (s *session) handleTCPPackage() error {
	conn = s.Connection.(*gettyTCPConn)
	for {
	    bufLen, err = conn.recv(buf)
		pktBuf.Write(buf[:bufLen])
		for {
			pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())	// 将字节流转为pkg
            ...
			s.addTask(pkg)									    // 交给业务逻辑处理
			pktBuf.Next(pkgLen)
		}
	}
	return perrors.WithStack(err)
}
func (s *session) addTask(pkg interface{}) {
	f := func() {
		s.listener.OnMessage(s, pkg)
		s.incReadPkgNum()
	}
	if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil { // 交给协程池执行
		taskPool.AddTask(f)
		return
	}
	f()	// 直接执行
}
发送数据
发送网络字节流、调用 EventListener.OnCron() 执行定时逻辑
func (s *session) handleLoop() {
	for {
		select {
		case <-s.done: ...
		case outPkg, ok = <-s.wQ:
			iovec = iovec[:0]
			for idx := 0; idx < maxIovecNum; idx++ {
				pkgBytes, err = s.writer.Write(s, outPkg)   // 编码
				iovec = append(iovec, pkgBytes)
			}
			err = s.WriteBytesArray(iovec[:]...)
		case <-wheel.After(s.period):
			if flag {
				s.listener.OnCron(s)
			}
		}
	}
}
cloudwego/netpoll
cloudwego/netpoll 认为 One Conn One Goroutine 有很大问题,大量的goroutine 上下文切换成本高,net.Conn 没有check alive的api 导致go 连接池 问题也比较多。

go func(){
  events := make([]event,128)
  for {
    n,_ := epoll_wait(epoll_fd,events,wait_msec)
    for i:=0;i<n; i++{
      read()/write()/catch(error) // 这些方法放在哪里呢?因为都是操作连接的,所以直接抽象一个conn 实现这三个方法
    }
  }
}
常见优化
带缓存的网络 I/O

每次从 net.Conn 尝试读取其内部缓存大小的数据,而不是用户传入的希望读取的数据大小。这些数据缓存在内存中,这样,后续的 Read 就可以直接从内存中得到数据,而不是每次都要从 net.Conn 读取,从而降低 Syscall 调用的频率。 有一个 生产-消费 []byte 过程:netpoller 协程 从connection/socket 读[]byte 到buffer,业务协程 从buffer 中读取[]byte decode 处理。这个“队列” 如何实现?
RingBuffer 天然合适,但是RingBuffer 存满后需要扩容,扩容需要copy,copy 会造成data race(read/write 指针不能碰面)。再进一步,如何实现一个无锁的ring buffer?使用链表解决扩容问题;使用sync.Pool 复用链表节点;维护一个length字段,通过atomic 避免data race。
重用内存对象
go tool pprof 可以观测占用内存最多的函数 和 代码(哪一行)。比如 每次服务端收到一个客户端 submit 请求时,都会在堆上分配一块内存表示 Submit 类型的实例
s := Submit{}
// 改为
var SubmitPool = sync.Pool{
    New: func() interface{} {
        return &Submit{}
    },
}
s := SubmitPool.Get().(*Submit) // 从SubmitPool池中获取一个Submit内存对象
...
SubmitPool.Put(submit)          // 将submit对象归还给Pool池