技术

下一个平台Agent 激发LLM涌现——提示工程 LLM微调理论及实践 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Python一些比较有意思的库 LLM部分技术源码学习 LangChain源码学习 通用分布式计算引擎Ray Python并发 go依赖注入 go collection gc的基本原理 golang性能分析及优化 数据湖 高性能计算与存储 Linux2.1.13网络源代码学习 《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器以及defer实现 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go堆内存分配 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 容器日志采集 Kubernetes 控制器模型 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 jib源码分析之细节 线程排队 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 递归、回溯、动态规划 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型和jvm内存布局 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

架构

大模型推理服务框架 模型服务化(未完成) 大模型RHLF 大模型训练 大模型推理 从Attention到Transformer k8s设备管理 LLM工具栈 ddd从理念到代码 如何应用LLM 小鼠如何驾驭大象(LLM)? 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 容器和CPU那些事儿 kubevela源码分析 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 AutoML和AutoDL 特征平台 实时训练 分布式链路追踪 helm tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 机器学习中的python调用c 机器学习训练框架概述 embedding的原理及实践 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从混部到统一调度 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 tensorflow学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Admission Controller 与 Admission Webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 概率论 serverless 泛谈 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 用户认证问题 资源的分配与回收——池 消息/任务队列

标签

k8s设备管理 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 容器和CPU那些事儿 kubevela源码分析 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 helm 从混部到统一调度 volcano特性源码分析 kubebuilder 学习 client-go学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 openkruise学习 基于Kubernetes选主及应用 Admission Controller 与 Admission Webhook k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? Kubernetes events学习及应用 CRI 资源调度泛谈 如何学习Kubernetes 以应用为中心 kubernetes operator kubernetes扩缩容 serverless 泛谈 什么是云原生 自定义CNI IPAM docker和k8s安全访问机制 Kubernetes监控 Kubernetes 控制器模型 Kubernetes资源调度——scheduler Kubernetes类型系统 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 Kubernetes源码分析——从kubectl开始 kubernetes yaml配置 CNI——容器网络是如何打通的 当我在说PaaS时,我在说什么 《深入剖析kubernetes》笔记 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件

mosn源码浅析

2019年12月21日

前言

先聊聊七层负载均衡

现代网络负载均衡与代理(上) 客户端建立一个到负载均衡 器的 TCP 连接。负载均衡器终结该连接(即直接响应 SYN),然后选择一个后端,并与该后端建立一个新的 TCP 连接(即发送一个新的 SYN)。四层负载均衡器通常只在四层 TCP/UDP 连接/会话级别上运行。因此, 负载均衡器通过转发数据,并确保来自同一会话的字节在同一后端结束。四层负载均衡器 不知道它正在转发数据的任何应用程序细节。数据内容可以是 HTTP, Redis, MongoDB,或任 何应用协议。

四层负载均衡有哪些缺点是七层(应用)负载均衡来解决的呢? 假如两个 gRPC/HTTP2 客户端通过四层负载均衡器连接想要与一个后端通信。四层负载均衡器为每个入站 TCP 连接创建一个出站的 TCP 连接,从而产生两个入站和两个出站的连接(CA ==> loadbalancer ==> SA, CB ==> loadbalancer ==> SB)。假设,客户端 A 每分钟发送 1 个请求,而客户端 B 每秒发送 50 个请求,则SA 的负载是 SB的 50倍。所以四层负载均衡器问题随着时 间的推移变得越来越不均衡。

上图 显示了一个七层 HTTP/2 负载均衡器。在本例中,客户端创建一个到负载均衡器的HTTP/2 TCP 连接。负载均衡器创建连接到两个后端。当客户端向负载均衡器发送两个HTTP/2 流时,流 1 被发送到后端 1,流 2 被发送到后端 2。因此,即使请求负载有很大差 异的客户端也会在后端之间实现高效地分发。这就是为什么七层负载均衡对现代协议如此 重要的原因。对于mosn来说,还支持协议转换,比如client mosn 之间是http,mosn 与server 之间是 grpc 协议。

初始化和启动

MOSN 源码解析 - 启动流程Mosn := NewMosn(c) 实例化了一个 Mosn 实例。Mosn.Start() 开始运行。我们先看 MOSN 的结构:

// mosn.io/mosn/pkg/mosn/starter.go
type Mosn struct {
	servers        []server.Server
	clustermanager types.ClusterManager
	routerManager  types.RouterManager
	config         *v2.MOSNConfig
	adminServer    admin.Server
	xdsClient      *xds.Client
	wg             sync.WaitGroup
	// for smooth upgrade. reconfigure
	inheritListeners []net.Listener
	reconfigure      net.Conn
}
  1. servers 是一个数组,server.Server 是接口类型。目前最多只支持配置一个server
  2. clustermanager 顾名思义就是集群管理器。 types.ClusterManager 也是接口类型。这里的 cluster 指得是 MOSN 连接到的一组逻辑上相似的上游主机。MOSN 通过服务发现来发现集群中的成员,并通过主动运行状况检查来确定集群成员的健康状况。MOSN 如何将请求路由到集群成员由负载均衡策略确定。
  3. routerManager 是路由管理器,MOSN 根据路由规则来对请求进行代理。
  4. adminServer 是一个服务,可以通过 http 请求获取 MOSN 的配置、状态等等
  5. xdsClient 是 xds 协议的客户端。关于 xds, Envoy 通过查询文件或管理服务器来动态发现资源。概括地讲,对应的发现服务及其相应的 API 被称作 xDS。mosn 也使用 xDS,这样就可以兼容 istio。
  6. inheritListeners 和 reconfigure 都是为了实现 MOSN 的平滑升级和重启。具体参见MOSN 源码解析 - 启动流程

初始化

  1. 初始化配置文件路径,日志,进程id路径,unix socket 路径,trace的开关(SOFATracer)以及日志插件。
  2. 通过 server.GetInheritListeners() 来判断启动模式(普通启动或热升级/重启),并在热升级/重启的情况下继承旧 MOSN 的监听器文件描述符。
  3. 如果是热升级/重启,则设置 Mosn 状态为 Active_Reconfiguring;如果是普通启动,则直接调用 StartService(),关于 StartService 会在之后分析。
  4. 初始化指标服务。
  5. 根据是否是 Xds 模式初始化配置。

      xds 模式 非 Xds 模式
    clustermanager 使用 nil 来初始化 从配置文件中初始化
    routerManager 使用默认配置来实例化 初始化routerManager,
    并从配置文件中读取路由配置更新
    server 使用默认配置来实例化 从配置文件中读取 listener 并添加

启动

  1. 启动 xdsClient, xdsClient 负责从 pilot 周期地拉取 listeners/clusters/clusterloadassignment 配置。这个特性使得用户可以通过 crd 来动态的改变 service mesh 中的策略。
  2. 开始执行所有注册在 featuregate中 feature 的初始化函数。
  3. 解析服务注册信息
  4. MOSN 启动前的准备工作 beforeStart()
  5. 正式启动 MOSN 的服务

     for _, srv := range m.servers {
         utils.GoWithRecover(func() {
             srv.Start()
         }, nil)
     }
    

    对于当前来说,只有一个 server,这个 server 是在 NewMosn 中初始化的。

协议无关L4 数据接收

activeListener.Start ==> listener.Start ==> listener.acceptEventLoop ==> listener.accept ==> activeListener.OnAccept ==> activeRawConn.ContinueFilterChain ==> activeListener.newConnection ==> activeListener.OnNewConnection ==> connection.Start ==> connection.startRWLoop ==> connection.startReadLoop and startWriteLoop 以read 为例 ==> connection.doRead ==> connection.onRead ==> filterManager.OnRead ==> filterManager.onContinueReading ==> proxy.OnData

上述链路与源码位置结合且省略一下

mosn.io/mosn/pkg/
    server/handler.go           // activeListener.Start ==> activeListener.OnAccept ==> activeListener.OnNewConnection
    network/connection.go       // connection.Start ==> connection.onRead
           /filterManager.go    // filterManager.OnRead
    proxy/proxy.go              // proxy.OnData 

如果mosn 只支持四层负载均衡的话,到proxy.OnData 就可以可以通过 负载均衡 选择一个下游 节点发送数据了。mosn 支持七层负载均衡,L7就涉及到 协议处理了。

func (c *connection) startReadLoop() {
	for {
		select {
		case <-c.readEnabledChan:
		default:
			//真正的读取数据逻辑在这里
			err := c.doRead()
		}
	}
}
func (c *connection) doRead() (err error) {
    //为该连接创建一个buffer来保存读入的数据
	//从连接中读取数据,返回实际读取到的字节数,rawConnection对应的就是原始连接
	bytesRead, err = c.readBuffer.ReadOnce(c.rawConnection)
	//通知上层读取到了新的数据
	c.onRead()
}
func (c *connection) onRead() {
	//filterManager过滤器管理者,把读取到的数据交给过滤器链路进行处理
	c.filterManager.OnRead()
}
func (fm *filterManager) onContinueReading(filter *activeReadFilter) {
	//这里可以清楚的看到网络层读取到数据以后,通过filterManager把数据交给整个过滤器链路处理
	for ; index < len(fm.upstreamFilters); index++ {
		uf = fm.upstreamFilters[index]
        //针对还没有初始化的过滤器回调其初始化方法OnNewConnection
		buf := fm.conn.GetReadBuffer()	
        //通知过滤器进行处理
        status := uf.filter.OnData(buf)
        if status == api.Stop {
            return
        }
	}
}

network 层的read filter 过滤器对应的实现就在proxy.go文件中

func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus {
    //针对使用的协议类型初始化serverStreamConn
	if p.serverStreamConn == nil {
		protocol, err := stream.SelectStreamFactoryProtocol(p.context, prot, buf.Bytes())
		p.serverStreamConn = stream.CreateServerStreamConnection(p.context, protocol, p.readCallbacks.Connection(), p)
	}
	//把数据分发到对应协议的的解码器
	p.serverStreamConn.Dispatch(buf)
}

L7 基于多路复用的转发

mosn.io/mosn/pkg
    /types/stream.go        // Stream is a generic protocol stream  定义了stream 层的很多接口
    /stream
        /http/stream.go
        /http2/stream.go
        /xprotocol
            /stream.go
            /conn.go
    /stream.go      // 通用/父类实现
    /proxy
        /downstream.go
        /upstream.go

MOSN 多协议机制解析 MOSN 的底层机制与 Envoy、Nginx 并没有核心差异,同样支持基于 I/O 多路复用的 L4 读写过滤器扩展,并在此基础之上再封装 L7 的处理。但是与前两者不同的是,MOSN 针对典型的微服务通信场景,抽象出了一套适用于基于多路复用 RPC 协议的扩展框架。

多路复用的定义:允许在单条链接上,并发处理多个请求/响应。 对于http2 来说,在connection 之上专门提了一个stream 概念 以表达多路复用,一般rpc 框架则 只是通过<requestId,Request> 将请求暂存起来,当收到 响应时,从Response 中提取requestId 就可以与 Request建立关联。这样rpc 框架不用 发一个Request 收到Response 之后再发下一个 Request。像http2 一样,mosn 也专门显式化了 Stream 的概念(以及一个stream 包含header/data/trailer 3个Frame),dubbo 实现中,mosn 的Steam.streamId 就是 dubbo Frame.Header.Id

  1. MOSN 从 downstream(conn=2) 接收了一个请求 request,依据报文扩展多路复用接口 GetRequestId 获取到请求在这条连接上的身份标识(requestId=1),并记录到关联映射中待用;
  2. 请求经过 MOSN 的路由、负载均衡处理,选择了一个 upstream(conn=5),同时在这条链接上新建了一个请求流(requestId=30),并调用文扩展多路复用接口 SetRequestId 封装新的身份标识,并记录到关联映射中与 downstream 信息组合;
  3. MOSN 从 upstream(conn=5) 接收了一个响应 response,依据报文扩展多路复用接口 GetRequestId 获取到请求在这条连接上的身份标识(requestId=30)。此时可以从上下游关联映射表中,根据 upstream 信息(connId=5, requestId=30) 找到对应的 downstream 信息(connId=2, requestId=1);
  4. 依据 downstream request 的信息,调用文扩展多路复用接口 SetRequestId 设置响应的 requestId,并回复给 downstream;

MOSN 源码解析 - 协程模型

mosn 作为一个七层代理,其核心工作就是转发,L7 层转发支持http、http2 和针对微服务场景xprotocol。

  1. mosn proxy 架设了基于多路复用/Stream机制的转发:多路复用由Stream 概念表示,一个 请求/响应 对应多个frame(至少包含header 和 data 2个frame)。哪怕http 不是多路复用也 迁就了这一套约定。在proxy包中,转发逻辑由 downstream.go 和 upstream.go 完成,各个协议不需要自己实现转发逻辑,只需要向 mosn 的Stream 机制靠拢即可:实现ServerStreamConnection 和 ClientStreamConnection interface
  2. 对于微服务框架,xprotocol 进一步的封装了功能代码,各rpc 协议只需实现xprotocol.XProtocol interface。

从下游接收请求 handleRequest:proxy.onData ==> xprotocol.streamConn(serverStreamConnection).Dispatch ==> xprotocol.streamConn.handleFrame ==> xprotocol.streamConn.handleRequest ==> create serverStream(xStream) 并关联 downStream ==> downStream.OnReceive ==> downStream.receive ==> downStream.matchRoute ==> downStream.chooseHost 确定下游主机 ==> downStream.receiveHeaders/upstreamRequest.appendHeaders/xprotocol.xStream.AppendHeaders ==> downStream.receiveData/upstreamRequest.appendData/xprotocol.xStream.AppendData ==> downStream.receiveTrailers/upstreamRequest.appendTrailers/xprotocol.xStream.AppendTrailers ==> xprotocol.xStream.endStream ==> buf = xprotocol.xStream.streamConn.protocol.Encode(frame) ==> xprotocol.xStream.streamConn.netConn.Write(buf) 转发暂停

从上游接收响应 handleResponse:client.onData ==> xprotocol.streamConn(clientStreamConnection).Dispatch ==> xprotocol.streamConn.handleFrame ==> xprotocol.streamConn.handleResponse ==> clientStream = xprotocol.streamConn.clientStreams[requestId] ==> clientStreamReceiverWrapper.onReceive ==> upstreamRequest.OnReceive ==> downStream.sendNotify ==> 接收协程从先前中断的地方继续 downStream.receive ==> upstreamRequest.receiveHeaders/downStream.appendHeaders/xprotocol.xStream.AppendHeaders ==> upstreamRequest.receiveData/downStream.appendData/xprotocol.xStream.AppendData ==> upstreamRequest.receiveTrailers/downStream.appendTrailers/xprotocol.xStream.AppendTrailers ==> xprotocol.xStream.endStream ==> buf = xprotocol.xStream.streamConn.protocol.Encode(frame) ==> xprotocol.xStream.streamConn.netConn.Write(buf)

两个体会:

  1. 可以看到 基于io事件 如何实现了转发逻辑
  2. 网络数据 从字节数组都frame 再到协议对象 都是实际存在的,Connection/StreamConnection/Stream 则都是 通信两端 为维护数据状态 而产生的

以官方 dubbo example 运行为例: dubbo consumer ==> client mosn localhost:2045 ==> server mosn localhost:2046 ==> dubbo provider 0.0.0.0:20880

[DEBUG] new idlechecker: maxIdleCount:6, conn:2
// 收到client connection=2;
[DEBUG] [server] [listener] accept connection from 0.0.0.0:2045, condId= 2, remote addr:192.168.104.18:60625
[DEBUG] [2,-] [stream] [xprotocol] new stream detect, requestId = 0
[DEBUG] [2,] [proxy] [downstream] new stream, proxyId = 1 , requestId =0, oneway=false
[DEBUG] [2,] [proxy] [downstream] 0 stream filters in config
// receive header frame;
[DEBUG] [2,] [proxy] [downstream] OnReceive headers   跟上header 的具体内容
[DEBUG] [2,] [proxy] [downstream] enter phase DownFilter[1], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase MatchRoute[2], proxyId = 1
[DEBUG] [router] [routers] [MatchRoute]
[DEBUG] [router] [routers] [findVirtualHost] found default virtual host only
[DEBUG] [2,] [router] [DefaultHandklerChain] [MatchRoute] matched a route: &{0xc00023d900 .*}
[DEBUG] [2,] [proxy] [downstream] enter phase DownFilterAfterRoute[3], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase ChooseHost[4], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] route match result:&{RouteRuleImplBase:0xc00023d900 matchValue:.*}, clusterName=clientCluster
[DEBUG] [upstream] [cluster manager] clusterSnapshot.loadbalancer.ChooseHost result is 127.0.0.1:2046, cluster name = clientCluster
[DEBUG] [stream] [sofarpc] [connpool] init host 127.0.0.1:2046
[INFO] remote addr: 127.0.0.1:2046, network: tcp
// client mosn 建立与上游mosn 的connection =3;
[DEBUG] [network] [check use writeloop] Connection = 3, Local Address = 127.0.0.1:60626, Remote Address = 127.0.0.1:2046
[DEBUG] [network] [client connection connect] connect raw tcp, remote address = 127.0.0.1:2046 ,event = ConnectedFlag, error = <nil>
[DEBUG] client OnEvent ConnectedFlag, connected false
[DEBUG] [2,] [proxy] [downstream] timeout info: {GlobalTimeout:1m0s TryTimeout:0s}
[DEBUG] [2,] [proxy] [downstream] enter phase DownFilterAfterChooseHost[5], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase DownRecvHeader[6], proxyId = 1
// 向上游发送 header frame; 
[DEBUG] [2,] [proxy] [upstream] append headers: xx  跟上header 的具体内容
[DEBUG] [2,] [proxy] [upstream] connPool ready, proxyId = 1, host = 127.0.0.1:2046
[DEBUG] [2,] [stream] [xprotocol] appendHeaders, direction = 0, requestId = 1
// receive data frame;
[DEBUG] [2,] [proxy] [downstream] enter phase DownRecvData[7], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] receive data = 2.0.2mosn.io.dubbo.DemoService0.0.sayHelloLjava/lang/String;MOSNHpathmosn.io.dubbo.DemoService	interfacemosn.io.dubbo.DemoServiceversion0.0.0Z
[DEBUG] [2,] [proxy] [downstream] start a request timeout timer
// 向上游发送 data frame;
[DEBUG] [2,] [proxy] [upstream] append data:2.0.2mosn.io.dubbo.DemoService0.0.sayHelloLjava/lang/String;MOSNHpathmosn.io.dubbo.DemoService	interfacemosn.io.dubbo.DemoServiceversion0.0.0Z
[DEBUG] [2,] [stream] [xprotocol] appendData, direction = 0, requestId = 1
[DEBUG] [2,] [stream] [xprotocol] connection 3 endStream, direction = 0, requestId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase WaitNotify[11], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] waitNotify begin 0xc00046c000, proxyId = 1
[DEBUG] [2,] [stream] [xprotocol] connection 3 receive response, requestId = 1
// 从上游 接收 header frame 发往下游;
[DEBUG] [2,] [proxy] [upstream] OnReceive headers: xx跟上响应header 的具体内容
[DEBUG] [2,] [proxy] [downstream] OnReceive send downstream response xx
[DEBUG] [2,] [proxy] [downstream] enter phase UpFilter[12], proxyId = 1
[DEBUG] [2,] [proxy] [downstream] enter phase UpRecvHeader[13], proxyId = 1
[DEBUG] [2,] [stream] [xprotocol] appendHeaders, direction = 1, requestId = 0
从上游接收 data frame 发往下游;
[DEBUG] [2,] [proxy] [downstream] enter phase UpRecvData[14], proxyId = 1
[DEBUG] [2,] [stream] [xprotocol] appendData, direction = 1, requestId = 0
[DEBUG] [2,] [stream] [xprotocol] connection 2 endStream, direction = 1, requestId = 0
[DEBUG] update listener write bytes: 89

基于上述认识,云原生网络代理 MOSN 的进化之路我们再来看 mosn 的分层结构设计。其中,每一层通过工厂设计模式向外暴露其接口,方便用户灵活地注册自身的需求。

  1. NET/IO 作为网络层,监测连接和数据包的到来,同时作为 listener filter 和 network filter 的挂载点;
  2. Protocol 作为多协议引擎层,对数据包进行检测,并使用对应协议做 decode/encode 处理。Protocol 层对应了代码中的 StreamConnection struct,将各个协议映射为 stream 处理机制:Dispatch(buf) 将字节数组 decode 为frame,并 ,非常重要,这也与 代码中的package 包 单纯负责 编解码是 不一样的。
  3. Stream 对 decode 的数据包做二次封装为 stream,作为 stream filter 的挂载点;
  4. Proxy 作为 MOSN 的转发框架,对封装的 stream 做 proxy 处理;

转发代码分析(从下游接收请求部分)

mosn 数据接收时,从proxy.onData 收到传上来的数据,执行对应协议的serverStreamConnection.Dispatch 经过协议解析, 字节流转成了协议的数据包,转给了StreamReceiveListener.OnReceive。proxy.downStream 实现了 StreamReceiveListener。

func (sc *streamConn) Dispatch(buf types.IoBuffer) {
	for {
		// 针对读取到的数据,按照协议类型进行解码
		frame, err := sc.protocol.Decode(streamCtx, buf)
		// No enough data
		//如果没有报错且没有解析成功,那就说明当前收到的数据不够解码,退出循环,等待更多数据到来
		if frame == nil && err == nil {
			return
		}
		//解码成功以后,开始处理该请求
		sc.handleFrame(streamCtx, xframe)
	}
}
func (sc *streamConn) handleFrame(ctx context.Context, frame xprotocol.XFrame) {
	switch frame.GetStreamType() {
	case xprotocol.Request:
		sc.handleRequest(ctx, frame, false)
	case xprotocol.RequestOneWay:
		sc.handleRequest(ctx, frame, true)
	case xprotocol.Response:
		sc.handleResponse(ctx, frame)
	}
}
func (sc *streamConn) handleRequest(ctx context.Context, frame xprotocol.XFrame, oneway bool) {
	// 1. heartbeat process
	if frame.IsHeartbeatFrame() {...}
	// 2. goaway process
	if ...{...}
	// 3. create server stream
	serverStream := sc.newServerStream(ctx, frame)
	// 4. tracer support
	// 5. inject service info
    // 6. receiver callback
    serverStream.receiver = sc.serverCallbacks.NewStreamDetect(serverStream.ctx, sender, span)
	serverStream.receiver.OnReceive(serverStream.ctx, frame.GetHeader(), frame.GetData(), nil)
}

在xprotocol 对应的ServerStreamConnection 中,每次收到一个新的xprotocol.xStream,xStream.receiver 即downStream ,downStream代码注释中提到: Downstream stream, as a controller to handle downstream and upstream proxy flow。 downStream 同时持有responseSender 成员指向Stream,用于upstream收到响应数据时 回传给client。

downStream.OnReceive 逻辑

func (s *downStream) OnReceive(ctx context.Context,..., data types.IoBuffer, ...) {
    ...
    //把给任务丢给协程池进行处理即可
    pool.ScheduleAuto(func() {
        phase := types.InitPhase
        for i := 0; i < 10; i++ {
            ...
            phase = s.receive(ctx, id, phase)
            ...
        }
    }
}

downStream.receive 总体来说在请求转发阶段,依次需要经过DownFilter -> MatchRoute -> DownFilterAfterRoute -> DownRecvHeader -> DownRecvData -> DownRecvTrailer -> WaitNofity这么几个阶段。

  1. DownFilter, mosn 的配置文件config.json 中的Listener 配置包含 stream filter 配置,就是在此处被使用

     "listeners":[
         {
             "name":"",
             "address":"", ## Listener 监听的地址
             "filter_chains":[],  ##  MOSN 仅支持一个 filter_chain
             "stream_filters":[], ## 一组 stream_filter 配置,目前只在 filter_chain 中配置了 filter 包含 proxy 时生效
         }
     ]
    
  2. MatchRoute,一个请求所属的domains 绑定了许多路由规则,目的将一个请求 路由到一个cluster 上
  3. ChooseHost,每一个cluster 对应一个连接池。 从池中 选出一个连接 赋给 downStream.upstreamRequest
  4. WaitNofity则是转发成功以后,等待被响应数据唤醒。
func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {
    for i := 0; i <= int(types.End-types.InitPhase); i++ {
        switch phase {
        // init phase
        case types.InitPhase:
            phase++
        // downstream filter before route
        case types.DownFilter:
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)
            phase++
        // match route
        case types.MatchRoute:
            //生成服务提供者的地址列表以及路由规则
            s.matchRoute()
            phase++
        // downstream filter after route
        case types.DownFilterAfterRoute:
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)
            phase++
        // downstream receive header
        case types.DownRecvHeader:
            //这里开始依次发送数据
            s.receiveHeaders(s.downstreamReqDataBuf == nil && s.downstreamReqTrailers == nil)
            phase++
        // downstream receive data
        case types.DownRecvData:
            //check not null
            s.receiveData(s.downstreamReqTrailers == nil)
            phase++
        // downstream receive trailer
        case types.DownRecvTrailer:
            s.receiveTrailers()
            phase++
        // downstream oneway
        case types.Oneway:
            ...
        case types.Retry:
            ...
            phase++
        case types.WaitNofity:
            //这里阻塞等待返回及结果
            if p, err := s.waitNotify(id); err != nil {
				return p
            }
            phase++
        // upstream filter
        case types.UpFilter:
            s.runAppendFilters(phase, s.downstreamRespHeaders, s.downstreamRespDataBuf, s.downstreamRespTrailers)
            // maybe direct response
            phase++
        // upstream receive header
        case types.UpRecvHeader:
            // send downstream response
            // check not null
            s.upstreamRequest.receiveHeaders(s.downstreamRespDataBuf == nil && s.downstreamRespTrailers == nil)
            phase++
        // upstream receive data
        case types.UpRecvData:
            // check not null
            s.upstreamRequest.receiveData(s.downstreamRespTrailers == nil)
            phase++
        // upstream receive triler
        case types.UpRecvTrailer:
            //check not null
            s.upstreamRequest.receiveTrailers()
            phase++
        // process end
        case types.End:
            return types.End
        default:
            return types.End
        }
    }
    return types.End
}

真正的发送数据逻辑是在receiveHeaders、receiveData、receiveTrailers这三个方法里,当然每次请求不一定都需要有这三部分的数据,这里我们以receiveHeaders方法为例来进行说明:

func (s *downStream) receiveHeaders(endStream bool) {
	s.downstreamRecvDone = endStream
    ...
	clusterName := s.route.RouteRule().ClusterName()
	s.cluster = s.snapshot.ClusterInfo()
	s.requestInfo.SetRouteEntry(s.route.RouteRule())
	//初始化连接池
	pool, err := s.initializeUpstreamConnectionPool(s)
	parseProxyTimeout(&s.timeout, s.route, s.downstreamReqHeaders)
	prot := s.getUpstreamProtocol()
	s.retryState = newRetryState(s.route.RouteRule().Policy().RetryPolicy(), s.downstreamReqHeaders, s.cluster, prot)
	//构建对应的upstream请求
	proxyBuffers := proxyBuffersByContext(s.context)
	s.upstreamRequest = &proxyBuffers.request
	s.upstreamRequest.downStream = s
	s.upstreamRequest.proxy = s.proxy
	s.upstreamRequest.protocol = prot
	s.upstreamRequest.connPool = pool
	s.route.RouteRule().FinalizeRequestHeaders(s.downstreamReqHeaders, s.requestInfo)
	//这里发送数据
	s.upstreamRequest.appendHeaders(endStream)
	//这里开启超时计时器
	if endStream {
		s.onUpstreamRequestSent()
	}
}
func (r *upstreamRequest) appendHeaders(endStream bool) {
	... 
	r.connPool.NewStream(r.downStream.context, r, r)
}

与一个 downStream struct 对应的是upstreamRequest ,倒不算一对一关系。downStream 聚合一个upstreamRequest 成员,从bufferPool(本质是go的对象池sync.Pool)取出一个成员赋给 downStream.upstreamRequest,结束后会调用 downStream.cleanStream 回收。

connPool也是每个协议 不同的,以xprotocol 为例

func (p *connPool) NewStream(ctx context.Context, responseDecoder types.StreamReceiveListener, listener types.PoolEventListener) {
	subProtocol := getSubProtocol(ctx)
    //从连接池中获取连接
	client, _ := p.activeClients.Load(subProtocol)
	activeClient := client.(*activeClient)
    var streamEncoder types.StreamSender
    //这里会把streamId对应的stream保存起来
    streamEncoder = activeClient.client.NewStream(ctx, responseDecoder)
    streamEncoder.GetStream().AddEventListener(activeClient)
    //发送数据
    listener.OnReady(streamEncoder, p.host)
}

从 xprotocol.connPool 取出一个client ,创建了一个协议对应的 Stream(变量名为 streamEncoder),对xprotocol 就是xStream,最终执行了AppendHeaders

func (r *upstreamRequest) OnReady(sender types.StreamSender, host types.Host) {
	r.requestSender = sender
	r.host = host
	r.requestSender.GetStream().AddEventListener(r)
	r.startTime = time.Now()

	endStream := r.sendComplete && !r.dataSent && !r.trailerSent
    //发送数据
	r.requestSender.AppendHeaders(r.downStream.context, r.convertHeader(r.downStream.downstreamReqHeaders), endStream)

	r.downStream.requestInfo.OnUpstreamHostSelected(host)
	r.downStream.requestInfo.SetUpstreamLocalAddress(host.AddressString())
}
func (s *xStream) AppendHeaders(ctx context.Context, headers types.HeaderMap, endStream bool) (err error) {
	// type assertion
	// hijack process
    s.frame = frame
	// endStream
	if endStream {
		s.endStream()
	}
	return
}
func (s *xStream) endStream() {
    // replace requestID
    s.frame.SetRequestId(s.id)
    // remove injected headers
    buf, err := s.sc.protocol.Encode(s.ctx, s.frame)
    err = s.sc.netConn.Write(buf)
}

xStream.endStream 真正触发 网络数据的发送,网络层的write

func (c *connection) Write(buffers ...buffer.IoBuffer) (err error) {
    //同样经过过滤器
	fs := c.filterManager.OnWrite(buffers)
	if fs == api.Stop {
		return nil
	}
	if !UseNetpollMode {
		if c.useWriteLoop {
			c.writeBufferChan <- &buffers
		} else {
			err = c.writeDirectly(&buffers)
		}
	} else {
		//netpoll模式写
	}
	return
}
//在对应的eventloop.go中的startWriteLoop方法:
func (c *connection) startWriteLoop() {
	var err error
	for {
		select {
		case <-c.internalStopChan:
			return
		case buf, ok := <-c.writeBufferChan:
			c.appendBuffer(buf)
            c.rawConnection.SetWriteDeadline(time.Now().Add(types.DefaultConnWriteTimeout))
			_, err = c.doWrite()
		}
	}
}

请求数据发出去以后当前协程就阻塞了,看下waitNotify方法的实现:

func (s *downStream) waitNotify(id uint32) (phase types.Phase, err error) {
	if s.ID != id {
		return types.End, types.ErrExit
	}
	//阻塞等待
	select {
	case <-s.notify:
	}
	return s.processError(id)
}

学到的

不要硬看代码,尤其对于多协程程序

  1. 打印日志
  2. debug.PrintStack() 来查看某一个方法之前的调用栈。 再进一步,runtime.Caller 可以查看调用者所在源码文件与行号
     // 一次获取一个调用者
     // 0 表示当前函数,1 表示上一层函数,依次往上
     if pc, file, line, ok := runtime.Caller(1); ok{
         fmt.Println(runtime.FuncForPC(pc).Name(), file, line)
     }
     // 一次获取多个调用者
     pc := make([]uintptr, 10)
     n := runtime.Callers(1, pc)
     for i := 0; i < n; i++ {
         f := runtime.FuncForPC(pc[i])
         file, line := f.FileLine(pc[i])
         fmt.Printf("%s %d %s\n", file, line, f.Name())
     }
    
  3. fmt.Printf("==> %T\n",xx) 如果一个interface 有多个“实现类” 可以通过%T 查看struct 的类型