10 分钟阅读

前言

先聊聊七层负载均衡

现代网络负载均衡与代理(上) 客户端建立一个到负载均衡 器的 TCP 连接。负载均衡器终结该连接(即直接响应 SYN),然后选择一个后端,并与该后端建立一个新的 TCP 连接(即发送一个新的 SYN)。四层负载均衡器通常只在四层 TCP/UDP 连接/会话级别上运行。因此, 负载均衡器通过转发数据,并确保来自同一会话的字节在同一后端结束。四层负载均衡器 不知道它正在转发数据的任何应用程序细节。数据内容可以是 HTTP, Redis, MongoDB,或任 何应用协议。

四层负载均衡有哪些缺点是七层(应用)负载均衡来解决的呢? 假如两个 gRPC/HTTP2 客户端通过四层负载均衡器连接想要与一个后端通信。四层负载均衡器为每个入站 TCP 连接创建一个出站的 TCP 连接,从而产生两个入站和两个出站的连接(CA ==> loadbalancer ==> SA, CB ==> loadbalancer ==> SB)。假设,客户端 A 每分钟发送 1 个请求,而客户端 B 每秒发送 50 个请求,则SA 的负载是 SB的 50倍。所以四层负载均衡器问题随着时 间的推移变得越来越不均衡。

上图 显示了一个七层 HTTP/2 负载均衡器。在本例中,客户端创建一个到负载均衡器的HTTP/2 TCP 连接。负载均衡器创建连接到两个后端。当客户端向负载均衡器发送两个HTTP/2 流时,流 1 被发送到后端 1,流 2 被发送到后端 2。因此,即使请求负载有很大差 异的客户端也会在后端之间实现高效地分发。这就是为什么七层负载均衡对现代协议如此 重要的原因。对于mosn来说,还支持协议转换,比如client mosn 之间是http,mosn 与server 之间是 grpc 协议。

初始化和启动

MOSN 源码解析 - 启动流程Mosn := NewMosn(c) 实例化了一个 Mosn 实例。Mosn.Start() 开始运行。我们先看 MOSN 的结构:

// mosn.io/mosn/pkg/mosn/starter.go
type Mosn struct {
	servers        []server.Server
	clustermanager types.ClusterManager
	routerManager  types.RouterManager
	config         *v2.MOSNConfig
	adminServer    admin.Server
	xdsClient      *xds.Client
	wg             sync.WaitGroup
	// for smooth upgrade. reconfigure
	inheritListeners []net.Listener
	reconfigure      net.Conn
}
  1. servers 是一个数组,server.Server 是接口类型。目前最多只支持配置一个server
  2. clustermanager 顾名思义就是集群管理器。 types.ClusterManager 也是接口类型。这里的 cluster 指得是 MOSN 连接到的一组逻辑上相似的上游主机。MOSN 通过服务发现来发现集群中的成员,并通过主动运行状况检查来确定集群成员的健康状况。MOSN 如何将请求路由到集群成员由负载均衡策略确定。
  3. routerManager 是路由管理器,MOSN 根据路由规则来对请求进行代理。
  4. adminServer 是一个服务,可以通过 http 请求获取 MOSN 的配置、状态等等
  5. xdsClient 是 xds 协议的客户端。关于 xds, Envoy 通过查询文件或管理服务器来动态发现资源。概括地讲,对应的发现服务及其相应的 API 被称作 xDS。mosn 也使用 xDS,这样就可以兼容 istio。
  6. inheritListeners 和 reconfigure 都是为了实现 MOSN 的平滑升级和重启。具体参见MOSN 源码解析 - 启动流程

初始化

  1. 初始化配置文件路径,日志,进程id路径,unix socket 路径,trace的开关(SOFATracer)以及日志插件。
  2. 通过 server.GetInheritListeners() 来判断启动模式(普通启动或热升级/重启),并在热升级/重启的情况下继承旧 MOSN 的监听器文件描述符。
  3. 如果是热升级/重启,则设置 Mosn 状态为 Active_Reconfiguring;如果是普通启动,则直接调用 StartService(),关于 StartService 会在之后分析。
  4. 初始化指标服务。
  5. 根据是否是 Xds 模式初始化配置。

      xds 模式 非 Xds 模式
    clustermanager 使用 nil 来初始化 从配置文件中初始化
    routerManager 使用默认配置来实例化 初始化routerManager,
    并从配置文件中读取路由配置更新
    server 使用默认配置来实例化 从配置文件中读取 listener 并添加

启动

  1. 启动 xdsClient, xdsClient 负责从 pilot 周期地拉取 listeners/clusters/clusterloadassignment 配置。这个特性使得用户可以通过 crd 来动态的改变 service mesh 中的策略。
  2. 开始执行所有注册在 featuregate中 feature 的初始化函数。
  3. 解析服务注册信息
  4. MOSN 启动前的准备工作 beforeStart()
  5. 正式启动 MOSN 的服务

     for _, srv := range m.servers {
         utils.GoWithRecover(func() {
             srv.Start()
         }, nil)
     }
    

    对于当前来说,只有一个 server,这个 server 是在 NewMosn 中初始化的。

协议无关L4 数据接收

activeListener.Start ==> listener.Start ==> listener.acceptEventLoop ==> listener.accept ==> activeListener.OnAccept ==> activeRawConn.ContinueFilterChain ==> activeListener.newConnection ==> activeListener.OnNewConnection ==> connection.Start ==> connection.startRWLoop ==> connection.startReadLoop and startWriteLoop 以read 为例 ==> connection.doRead ==> connection.onRead ==> filterManager.OnRead ==> filterManager.onContinueReading ==> proxy.OnData

上述链路与源码位置结合且省略一下

mosn.io/mosn/pkg/
    server/handler.go           // activeListener.Start ==> activeListener.OnAccept ==> activeListener.OnNewConnection
    network/connection.go       // connection.Start ==> connection.onRead
           /filterManager.go    // filterManager.OnRead
    proxy/proxy.go              // proxy.OnData 

如果mosn 只支持四层负载均衡的话,到proxy.OnData 就可以可以通过 负载均衡 选择一个下游 节点发送数据了。mosn 支持七层负载均衡,L7就涉及到 协议处理了。

func (c *connection) startReadLoop() {
	for {
		select {
		case <-c.readEnabledChan:
		default:
			//真正的读取数据逻辑在这里
			err := c.doRead()
		}
	}
}
func (c *connection) doRead() (err error) {
    //为该连接创建一个buffer来保存读入的数据
	//从连接中读取数据,返回实际读取到的字节数,rawConnection对应的就是原始连接
	bytesRead, err = c.readBuffer.ReadOnce(c.rawConnection)
	//通知上层读取到了新的数据
	c.onRead()
}
func (c *connection) onRead() {
	//filterManager过滤器管理者,把读取到的数据交给过滤器链路进行处理
	c.filterManager.OnRead()
}
func (fm *filterManager) onContinueReading(filter *activeReadFilter) {
	//这里可以清楚的看到网络层读取到数据以后,通过filterManager把数据交给整个过滤器链路处理
	for ; index < len(fm.upstreamFilters); index++ {
		uf = fm.upstreamFilters[index]
        //针对还没有初始化的过滤器回调其初始化方法OnNewConnection
		buf := fm.conn.GetReadBuffer()	
        //通知过滤器进行处理
        status := uf.filter.OnData(buf)
        if status == api.Stop {
            return
        }
	}
}

network 层的read filter 过滤器对应的实现就在proxy.go文件中

func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus {
    //针对使用的协议类型初始化serverStreamConn
	if p.serverStreamConn == nil {
		protocol, err := stream.SelectStreamFactoryProtocol(p.context, prot, buf.Bytes())
		p.serverStreamConn = stream.CreateServerStreamConnection(p.context, protocol, p.readCallbacks.Connection(), p)
	}
	//把数据分发到对应协议的的解码器
	p.serverStreamConn.Dispatch(buf)
}

L7 基于多路复用的转发

mosn.io/mosn/pkg
    /types/stream.go        // Stream is a generic protocol stream  定义了stream 层的很多接口
    /stream
        /http/stream.go
        /http2/stream.go
        /xprotocol
            /stream.go
            /conn.go
    /stream.go      // 通用/父类实现
    /proxy
        /downstream.go
        /upstream.go

MOSN 多协议机制解析 MOSN 的底层机制与 Envoy、Nginx 并没有核心差异,同样支持基于 I/O 多路复用的 L4 读写过滤器扩展,并在此基础之上再封装 L7 的处理。但是与前两者不同的是,MOSN 针对典型的微服务通信场景,抽象出了一套适用于基于多路复用 RPC 协议的扩展框架。

多路复用的定义:允许在单条链接上,并发处理多个请求/响应。 对于http2 来说,在connection 之上专门提了一个stream 概念 以表达多路复用,一般rpc 框架则 只是通过<requestId,Request> 将请求暂存起来,当收到 响应时,从Response 中提取requestId 就可以与 Request建立关联。这样rpc 框架不用 发一个Request 收到Response 之后再发下一个 Request。像http2 一样,mosn 也专门显式化了 Stream 的概念(以及一个stream 包含header/data/trailer 3个Frame),dubbo 实现中,mosn 的Steam.streamId 就是 dubbo Frame.Header.Id

  1. MOSN 从 downstream(conn=2) 接收了一个请求 request,依据报文扩展多路复用接口 GetRequestId 获取到请求在这条连接上的身份标识(requestId=1),并记录到关联映射中待用;
  2. 请求经过 MOSN 的路由、负载均衡处理,选择了一个 upstream(conn=5),同时在这条链接上新建了一个请求流(requestId=30),并调用文扩展多路复用接口 SetRequestId 封装新的身份标识,并记录到关联映射中与 downstream 信息组合;
  3. MOSN 从 upstream(conn=5) 接收了一个响应 response,依据报文扩展多路复用接口 GetRequestId 获取到请求在这条连接上的身份标识(requestId=30)。此时可以从上下游关联映射表中,根据 upstream 信息(connId=5, requestId=30) 找到对应的 downstream 信息(connId=2, requestId=1);
  4. 依据 downstream request 的信息,调用文扩展多路复用接口 SetRequestId 设置响应的 requestId,并回复给 downstream;

MOSN 源码解析 - 协程模型

mosn 作为一个七层代理,其核心工作就是转发,L7 层转发支持http、http2 和针对微服务场景xprotocol。

  1. mosn proxy 架设了基于多路复用/Stream机制的转发:多路复用由Stream 概念表示,一个 请求/响应 对应多个frame(至少包含header 和 data 2个frame)。哪怕http 不是多路复用也 迁就了这一套约定。在proxy包中,转发逻辑由 downstream.go 和 upstream.go 完成,各个协议不需要自己实现转发逻辑,只需要向 mosn 的Stream 机制靠拢即可:实现ServerStreamConnection 和 ClientStreamConnection interface
  2. 对于微服务框架,xprotocol 进一步的封装了功能代码,各rpc 协议只需实现xprotocol.XProtocol interface。

从下游接收请求 handleRequest:proxy.onData ==> xprotocol.streamConn(serverStreamConnection).Dispatch ==> xprotocol.streamConn.handleFrame ==> xprotocol.streamConn.handleRequest ==> create serverStream(xStream) 并关联 downStream ==> downStream.OnReceive ==> downStream.receive ==> downStream.matchRoute ==> downStream.chooseHost 确定下游主机 ==> downStream.receiveHeaders/upstreamRequest.appendHeaders/xprotocol.xStream.AppendHeaders ==> downStream.receiveData/upstreamRequest.appendData/xprotocol.xStream.AppendData ==> downStream.receiveTrailers/upstreamRequest.appendTrailers/xprotocol.xStream.AppendTrailers ==> xprotocol.xStream.endStream ==> buf = xprotocol.xStream.streamConn.protocol.Encode(frame) ==> xprotocol.xStream.streamConn.netConn.Write(buf) 转发暂停

从上游接收响应 handleResponse:client.onData ==> xprotocol.streamConn(clientStreamConnection).Dispatch ==> xprotocol.streamConn.handleFrame ==> xprotocol.streamConn.handleResponse ==> clientStream = xprotocol.streamConn.clientStreams[requestId] ==> clientStreamReceiverWrapper.onReceive ==> upstreamRequest.OnReceive ==> downStream.sendNotify ==> 接收协程从先前中断的地方继续 downStream.receive ==> upstreamRequest.receiveHeaders/downStream.appendHeaders/xprotocol.xStream.AppendHeaders ==> upstreamRequest.receiveData/downStream.appendData/xprotocol.xStream.AppendData ==> upstreamRequest.receiveTrailers/downStream.appendTrailers/xprotocol.xStream.AppendTrailers ==> xprotocol.xStream.endStream ==> buf = xprotocol.xStream.streamConn.protocol.Encode(frame) ==> xprotocol.xStream.streamConn.netConn.Write(buf)

两个体会:

  1. 可以看到 基于io事件 如何实现了转发逻辑
  2. 网络数据 从字节数组都frame 再到协议对象 都是实际存在的,Connection/StreamConnection/Stream 则都是 通信两端 为维护数据状态 而产生的

以官方 dubbo example 运行为例: dubbo consumer ==> client mosn localhost:2045 ==> server mosn localhost:2046 ==> dubbo provider 0.0.0.0:20880

[DEBUG] new idlechecker: maxIdleCount:6, conn:2
// 收到client connection=2;
[DEBUG] [server] [listener] accept connection from 0.0.0.0:2045, condId= 2, remote addr:192.168.104.18:60625
[DEBUG] [2,-] [stream] [xprotocol] new stream detect, requestId = 0
[DEBUG] [2,] [proxy] [downstream] new stream, proxyId = 1 , requestId =0, oneway=false
[DEBUG] [2,] [proxy] [downstream] 0 stream filters in config
// receive header frame;
[DEBUG] [2,] [proxy] [downstream] OnReceive headers   跟上header 的具体内容
[DEBUG] [2,] [proxy] [downstream] enter phase DownFilter[1], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase MatchRoute[2], proxyId = 1
[DEBUG] [router] [routers] [MatchRoute]
[DEBUG] [router] [routers] [findVirtualHost] found default virtual host only
[DEBUG] [2,] [router] [DefaultHandklerChain] [MatchRoute] matched a route: &{0xc00023d900 .*}
[DEBUG] [2,] [proxy] [downstream] enter phase DownFilterAfterRoute[3], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase ChooseHost[4], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] route match result:&{RouteRuleImplBase:0xc00023d900 matchValue:.*}, clusterName=clientCluster
[DEBUG] [upstream] [cluster manager] clusterSnapshot.loadbalancer.ChooseHost result is 127.0.0.1:2046, cluster name = clientCluster
[DEBUG] [stream] [sofarpc] [connpool] init host 127.0.0.1:2046
[INFO] remote addr: 127.0.0.1:2046, network: tcp
// client mosn 建立与上游mosn 的connection =3;
[DEBUG] [network] [check use writeloop] Connection = 3, Local Address = 127.0.0.1:60626, Remote Address = 127.0.0.1:2046
[DEBUG] [network] [client connection connect] connect raw tcp, remote address = 127.0.0.1:2046 ,event = ConnectedFlag, error = <nil>
[DEBUG] client OnEvent ConnectedFlag, connected false
[DEBUG] [2,] [proxy] [downstream] timeout info: {GlobalTimeout:1m0s TryTimeout:0s}
[DEBUG] [2,] [proxy] [downstream] enter phase DownFilterAfterChooseHost[5], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase DownRecvHeader[6], proxyId = 1
// 向上游发送 header frame; 
[DEBUG] [2,] [proxy] [upstream] append headers: xx  跟上header 的具体内容
[DEBUG] [2,] [proxy] [upstream] connPool ready, proxyId = 1, host = 127.0.0.1:2046
[DEBUG] [2,] [stream] [xprotocol] appendHeaders, direction = 0, requestId = 1
// receive data frame;
[DEBUG] [2,] [proxy] [downstream] enter phase DownRecvData[7], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] receive data = 2.0.2mosn.io.dubbo.DemoService0.0.sayHelloLjava/lang/String;MOSNHpathmosn.io.dubbo.DemoService	interfacemosn.io.dubbo.DemoServiceversion0.0.0Z
[DEBUG] [2,] [proxy] [downstream] start a request timeout timer
// 向上游发送 data frame;
[DEBUG] [2,] [proxy] [upstream] append data:2.0.2mosn.io.dubbo.DemoService0.0.sayHelloLjava/lang/String;MOSNHpathmosn.io.dubbo.DemoService	interfacemosn.io.dubbo.DemoServiceversion0.0.0Z
[DEBUG] [2,] [stream] [xprotocol] appendData, direction = 0, requestId = 1
[DEBUG] [2,] [stream] [xprotocol] connection 3 endStream, direction = 0, requestId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase WaitNotify[11], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] waitNotify begin 0xc00046c000, proxyId = 1
[DEBUG] [2,] [stream] [xprotocol] connection 3 receive response, requestId = 1
// 从上游 接收 header frame 发往下游;
[DEBUG] [2,] [proxy] [upstream] OnReceive headers: xx跟上响应header 的具体内容
[DEBUG] [2,] [proxy] [downstream] OnReceive send downstream response xx
[DEBUG] [2,] [proxy] [downstream] enter phase UpFilter[12], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase UpRecvHeader[13], proxyId = 1
[DEBUG] [2,] [stream] [xprotocol] appendHeaders, direction = 1, requestId = 0
从上游接收 data frame 发往下游;
[DEBUG] [2,] [proxy] [downstream] enter phase UpRecvData[14], proxyId = 1
[DEBUG] [2,] [stream] [xprotocol] appendData, direction = 1, requestId = 0
[DEBUG] [2,] [stream] [xprotocol] connection 2 endStream, direction = 1, requestId = 0
[DEBUG] update listener write bytes: 89

基于上述认识,云原生网络代理 MOSN 的进化之路我们再来看 mosn 的分层结构设计。其中,每一层通过工厂设计模式向外暴露其接口,方便用户灵活地注册自身的需求。

  1. NET/IO 作为网络层,监测连接和数据包的到来,同时作为 listener filter 和 network filter 的挂载点;
  2. Protocol 作为多协议引擎层,对数据包进行检测,并使用对应协议做 decode/encode 处理。Protocol 层对应了代码中的 StreamConnection struct,将各个协议映射为 stream 处理机制:Dispatch(buf) 将字节数组 decode 为frame,并 ,非常重要,这也与 代码中的package 包 单纯负责 编解码是 不一样的。
  3. Stream 对 decode 的数据包做二次封装为 stream,作为 stream filter 的挂载点;
  4. Proxy 作为 MOSN 的转发框架,对封装的 stream 做 proxy 处理;

转发代码分析(从下游接收请求部分)

mosn 数据接收时,从proxy.onData 收到传上来的数据,执行对应协议的serverStreamConnection.Dispatch 经过协议解析, 字节流转成了协议的数据包,转给了StreamReceiveListener.OnReceive。proxy.downStream 实现了 StreamReceiveListener。

func (sc *streamConn) Dispatch(buf types.IoBuffer) {
	for {
		// 针对读取到的数据,按照协议类型进行解码
		frame, err := sc.protocol.Decode(streamCtx, buf)
		// No enough data
		//如果没有报错且没有解析成功,那就说明当前收到的数据不够解码,退出循环,等待更多数据到来
		if frame == nil && err == nil {
			return
		}
		//解码成功以后,开始处理该请求
		sc.handleFrame(streamCtx, xframe)
	}
}
func (sc *streamConn) handleFrame(ctx context.Context, frame xprotocol.XFrame) {
	switch frame.GetStreamType() {
	case xprotocol.Request:
		sc.handleRequest(ctx, frame, false)
	case xprotocol.RequestOneWay:
		sc.handleRequest(ctx, frame, true)
	case xprotocol.Response:
		sc.handleResponse(ctx, frame)
	}
}
func (sc *streamConn) handleRequest(ctx context.Context, frame xprotocol.XFrame, oneway bool) {
	// 1. heartbeat process
	if frame.IsHeartbeatFrame() {...}
	// 2. goaway process
	if ...{...}
	// 3. create server stream
	serverStream := sc.newServerStream(ctx, frame)
	// 4. tracer support
	// 5. inject service info
    // 6. receiver callback
    serverStream.receiver = sc.serverCallbacks.NewStreamDetect(serverStream.ctx, sender, span)
	serverStream.receiver.OnReceive(serverStream.ctx, frame.GetHeader(), frame.GetData(), nil)
}

在xprotocol 对应的ServerStreamConnection 中,每次收到一个新的xprotocol.xStream,xStream.receiver 即downStream ,downStream代码注释中提到: Downstream stream, as a controller to handle downstream and upstream proxy flow。 downStream 同时持有responseSender 成员指向Stream,用于upstream收到响应数据时 回传给client。

downStream.OnReceive 逻辑

func (s *downStream) OnReceive(ctx context.Context,..., data types.IoBuffer, ...) {
    ...
    //把给任务丢给协程池进行处理即可
    pool.ScheduleAuto(func() {
        phase := types.InitPhase
        for i := 0; i < 10; i++ {
            ...
            phase = s.receive(ctx, id, phase)
            ...
        }
    }
}

downStream.receive 总体来说在请求转发阶段,依次需要经过DownFilter -> MatchRoute -> DownFilterAfterRoute -> DownRecvHeader -> DownRecvData -> DownRecvTrailer -> WaitNofity这么几个阶段。

  1. DownFilter, mosn 的配置文件config.json 中的Listener 配置包含 stream filter 配置,就是在此处被使用

     "listeners":[
         {
             "name":"",
             "address":"", ## Listener 监听的地址
             "filter_chains":[],  ##  MOSN 仅支持一个 filter_chain
             "stream_filters":[], ## 一组 stream_filter 配置,目前只在 filter_chain 中配置了 filter 包含 proxy 时生效
         }
     ]
    
  2. MatchRoute,一个请求所属的domains 绑定了许多路由规则,目的将一个请求 路由到一个cluster 上
  3. ChooseHost,每一个cluster 对应一个连接池。 从池中 选出一个连接 赋给 downStream.upstreamRequest
  4. WaitNofity则是转发成功以后,等待被响应数据唤醒。
func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {
    for i := 0; i <= int(types.End-types.InitPhase); i++ {
        switch phase {
        // init phase
        case types.InitPhase:
            phase++
        // downstream filter before route
        case types.DownFilter:
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)
            phase++
        // match route
        case types.MatchRoute:
            //生成服务提供者的地址列表以及路由规则
            s.matchRoute()
            phase++
        // downstream filter after route
        case types.DownFilterAfterRoute:
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)
            phase++
        // downstream receive header
        case types.DownRecvHeader:
            //这里开始依次发送数据
            s.receiveHeaders(s.downstreamReqDataBuf == nil && s.downstreamReqTrailers == nil)
            phase++
        // downstream receive data
        case types.DownRecvData:
            //check not null
            s.receiveData(s.downstreamReqTrailers == nil)
            phase++
        // downstream receive trailer
        case types.DownRecvTrailer:
            s.receiveTrailers()
            phase++
        // downstream oneway
        case types.Oneway:
            ...
        case types.Retry:
            ...
            phase++
        case types.WaitNofity:
            //这里阻塞等待返回及结果
            if p, err := s.waitNotify(id); err != nil {
				return p
            }
            phase++
        // upstream filter
        case types.UpFilter:
            s.runAppendFilters(phase, s.downstreamRespHeaders, s.downstreamRespDataBuf, s.downstreamRespTrailers)
            // maybe direct response
            phase++
        // upstream receive header
        case types.UpRecvHeader:
            // send downstream response
            // check not null
            s.upstreamRequest.receiveHeaders(s.downstreamRespDataBuf == nil && s.downstreamRespTrailers == nil)
            phase++
        // upstream receive data
        case types.UpRecvData:
            // check not null
            s.upstreamRequest.receiveData(s.downstreamRespTrailers == nil)
            phase++
        // upstream receive triler
        case types.UpRecvTrailer:
            //check not null
            s.upstreamRequest.receiveTrailers()
            phase++
        // process end
        case types.End:
            return types.End
        default:
            return types.End
        }
    }
    return types.End
}

真正的发送数据逻辑是在receiveHeaders、receiveData、receiveTrailers这三个方法里,当然每次请求不一定都需要有这三部分的数据,这里我们以receiveHeaders方法为例来进行说明:

func (s *downStream) receiveHeaders(endStream bool) {
	s.downstreamRecvDone = endStream
    ...
	clusterName := s.route.RouteRule().ClusterName()
	s.cluster = s.snapshot.ClusterInfo()
	s.requestInfo.SetRouteEntry(s.route.RouteRule())
	//初始化连接池
	pool, err := s.initializeUpstreamConnectionPool(s)
	parseProxyTimeout(&s.timeout, s.route, s.downstreamReqHeaders)
	prot := s.getUpstreamProtocol()
	s.retryState = newRetryState(s.route.RouteRule().Policy().RetryPolicy(), s.downstreamReqHeaders, s.cluster, prot)
	//构建对应的upstream请求
	proxyBuffers := proxyBuffersByContext(s.context)
	s.upstreamRequest = &proxyBuffers.request
	s.upstreamRequest.downStream = s
	s.upstreamRequest.proxy = s.proxy
	s.upstreamRequest.protocol = prot
	s.upstreamRequest.connPool = pool
	s.route.RouteRule().FinalizeRequestHeaders(s.downstreamReqHeaders, s.requestInfo)
	//这里发送数据
	s.upstreamRequest.appendHeaders(endStream)
	//这里开启超时计时器
	if endStream {
		s.onUpstreamRequestSent()
	}
}
func (r *upstreamRequest) appendHeaders(endStream bool) {
	... 
	r.connPool.NewStream(r.downStream.context, r, r)
}

与一个 downStream struct 对应的是upstreamRequest ,倒不算一对一关系。downStream 聚合一个upstreamRequest 成员,从bufferPool(本质是go的对象池sync.Pool)取出一个成员赋给 downStream.upstreamRequest,结束后会调用 downStream.cleanStream 回收。

connPool也是每个协议 不同的,以xprotocol 为例

func (p *connPool) NewStream(ctx context.Context, responseDecoder types.StreamReceiveListener, listener types.PoolEventListener) {
	subProtocol := getSubProtocol(ctx)
    //从连接池中获取连接
	client, _ := p.activeClients.Load(subProtocol)
	activeClient := client.(*activeClient)
    var streamEncoder types.StreamSender
    //这里会把streamId对应的stream保存起来
    streamEncoder = activeClient.client.NewStream(ctx, responseDecoder)
    streamEncoder.GetStream().AddEventListener(activeClient)
    //发送数据
    listener.OnReady(streamEncoder, p.host)
}

从 xprotocol.connPool 取出一个client ,创建了一个协议对应的 Stream(变量名为 streamEncoder),对xprotocol 就是xStream,最终执行了AppendHeaders

func (r *upstreamRequest) OnReady(sender types.StreamSender, host types.Host) {
	r.requestSender = sender
	r.host = host
	r.requestSender.GetStream().AddEventListener(r)
	r.startTime = time.Now()

	endStream := r.sendComplete && !r.dataSent && !r.trailerSent
    //发送数据
	r.requestSender.AppendHeaders(r.downStream.context, r.convertHeader(r.downStream.downstreamReqHeaders), endStream)

	r.downStream.requestInfo.OnUpstreamHostSelected(host)
	r.downStream.requestInfo.SetUpstreamLocalAddress(host.AddressString())
}
func (s *xStream) AppendHeaders(ctx context.Context, headers types.HeaderMap, endStream bool) (err error) {
	// type assertion
	// hijack process
    s.frame = frame
	// endStream
	if endStream {
		s.endStream()
	}
	return
}
func (s *xStream) endStream() {
    // replace requestID
    s.frame.SetRequestId(s.id)
    // remove injected headers
    buf, err := s.sc.protocol.Encode(s.ctx, s.frame)
    err = s.sc.netConn.Write(buf)
}

xStream.endStream 真正触发 网络数据的发送,网络层的write

func (c *connection) Write(buffers ...buffer.IoBuffer) (err error) {
    //同样经过过滤器
	fs := c.filterManager.OnWrite(buffers)
	if fs == api.Stop {
		return nil
	}
	if !UseNetpollMode {
		if c.useWriteLoop {
			c.writeBufferChan <- &buffers
		} else {
			err = c.writeDirectly(&buffers)
		}
	} else {
		//netpoll模式写
	}
	return
}
//在对应的eventloop.go中的startWriteLoop方法:
func (c *connection) startWriteLoop() {
	var err error
	for {
		select {
		case <-c.internalStopChan:
			return
		case buf, ok := <-c.writeBufferChan:
			c.appendBuffer(buf)
            c.rawConnection.SetWriteDeadline(time.Now().Add(types.DefaultConnWriteTimeout))
			_, err = c.doWrite()
		}
	}
}

请求数据发出去以后当前协程就阻塞了,看下waitNotify方法的实现:

func (s *downStream) waitNotify(id uint32) (phase types.Phase, err error) {
	if s.ID != id {
		return types.End, types.ErrExit
	}
	//阻塞等待
	select {
	case <-s.notify:
	}
	return s.processError(id)
}

学到的

不要硬看代码,尤其对于多协程程序

  1. 打印日志
  2. debug.PrintStack() 来查看某一个方法之前的调用栈。 再进一步,runtime.Caller 可以查看调用者所在源码文件与行号
     // 一次获取一个调用者
     // 0 表示当前函数,1 表示上一层函数,依次往上
     if pc, file, line, ok := runtime.Caller(1); ok{
         fmt.Println(runtime.FuncForPC(pc).Name(), file, line)
     }
     // 一次获取多个调用者
     pc := make([]uintptr, 10)
     n := runtime.Callers(1, pc)
     for i := 0; i < n; i++ {
         f := runtime.FuncForPC(pc[i])
         file, line := f.FileLine(pc[i])
         fmt.Printf("%s %d %s\n", file, line, f.Name())
     }
    
  3. fmt.Printf("==> %T\n",xx) 如果一个interface 有多个“实现类” 可以通过%T 查看struct 的类型

标签:

分类:

更新时间:

留下评论