技术

对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go细节 codereview mat使用 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 MVCC 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes自动扩容缩容 神经网络模型优化 直觉上理解机器学习 knative入门 如何学习机器学习 神经网络系列笔记 TIDB源码分析 《阿里巴巴云原生实践15讲》笔记 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 扩展Kubernetes 副本一致性 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全机制 jvm crash分析 Kubernetes监控 Kubernetes 控制器模型 Prometheus 学习 容器日志采集 容器狂占cpu怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes objects之编排对象 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 kubernetes实践 jib源码分析之细节 线程排队 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 《深入剖析kubernetes》笔记 精简代码的利器——lombok 学习 编程语言的动态性 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 分布式计算系统的那些套路 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 mysql 批量操作优化 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS2——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 硬件对软件设计的影响 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记2 《mysql技术内幕》笔记1 log4j学习 为什么netty比较难懂? 回溯法 apollo client源码分析及看待面向对象设计 学习并发 docker 环境(主要运行java项目)常见问题 Scala的一些梗 OpenTSDB 入门 spring事务小结 事务一致性 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型 java exception Linux IO学习 network channel network byte buffer 测试环境docker化实践 netty(七)netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Macvlan Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker中涉及到的一些linux知识 hystrix学习 Linux网络源代码学习——整体介绍 zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty(六)netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 Redis概述 机器学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 异构数据库表在线同步 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM内存与执行 git spring rmi和thrift maven/ant/gradle使用 再看tcp 缓存系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 VPN(Virtual Private Network) Kubernetes存储 Kubernetes 其它特性 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 使用etcd + confd + nginx做动态负载均衡 如何通过fleet unit files 来构建灵活的服务 CoreOS 安装 CoreOS 使用 Go学习 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

标签


mosn源码浅析

2019年12月21日

前言

初始化和启动

MOSN 源码解析 - 启动流程Mosn := NewMosn(c) 实例化了一个 Mosn 实例。Mosn.Start() 开始运行。我们先看 MOSN 的结构:

type Mosn struct {
	servers        []server.Server
	clustermanager types.ClusterManager
	routerManager  types.RouterManager
	config         *v2.MOSNConfig
	adminServer    admin.Server
	xdsClient      *xds.Client
	wg             sync.WaitGroup
	// for smooth upgrade. reconfigure
	inheritListeners []net.Listener
	reconfigure      net.Conn
}
  1. servers 是一个数组,server.Server 是接口类型。目前最多只支持配置一个server
  2. clustermanager 顾名思义就是集群管理器。 types.ClusterManager 也是接口类型。这里的 cluster 指得是 MOSN 连接到的一组逻辑上相似的上游主机。MOSN 通过服务发现来发现集群中的成员,并通过主动运行状况检查来确定集群成员的健康状况。MOSN 如何将请求路由到集群成员由负载均衡策略确定。
  3. routerManager 是路由管理器,MOSN 根据路由规则来对请求进行代理。
  4. adminServer 是一个服务,可以通过 http 请求获取 MOSN 的配置、状态等等
  5. xdsClient 是 xds 协议的客户端。关于 xds, Envoy 通过查询文件或管理服务器来动态发现资源。概括地讲,对应的发现服务及其相应的 API 被称作 xDS。mosn 也使用 xDS,这样就可以兼容 istio。
  6. inheritListeners 和 reconfigure 都是为了实现 MOSN 的平滑升级和重启。具体参见MOSN 源码解析 - 启动流程

初始化

  1. 初始化配置文件路径,日志,进程id路径,unix socket 路径,trace的开关(SOFATracer)以及日志插件。
  2. 通过 server.GetInheritListeners() 来判断启动模式(普通启动或热升级/重启),并在热升级/重启的情况下继承旧 MOSN 的监听器文件描述符。
  3. 如果是热升级/重启,则设置 Mosn 状态为 Active_Reconfiguring;如果是普通启动,则直接调用 StartService(),关于 StartService 会在之后分析。
  4. 初始化指标服务。
  5. 根据是否是 Xds 模式初始化配置。

      xds 模式 非 Xds 模式
    clustermanager 使用 nil 来初始化 从配置文件中初始化
    routerManager 使用默认配置来实例化 初始化routerManager,
    并从配置文件中读取路由配置更新
    server 使用默认配置来实例化 从配置文件中读取 listener 并添加

启动

  1. 启动 xdsClient, xdsClient 负责从 pilot 周期地拉取 listeners/clusters/clusterloadassignment 配置。这个特性使得用户可以通过 crd 来动态的改变 service mesh 中的策略。
  2. 开始执行所有注册在 featuregate中 feature 的初始化函数。
  3. 解析服务注册信息
  4. MOSN 启动前的准备工作 beforeStart()
  5. 正式启动 MOSN 的服务

     for _, srv := range m.servers {
         utils.GoWithRecover(func() {
             srv.Start()
         }, nil)
     }
    

    对于当前来说,只有一个 server,这个 server 是在 NewMosn 中初始化的。

数据转发

SOFAMosn Introduction

  1. MOSN 在 IO 层读取数据,通过 read filter 将数据发送到 Protocol 层进行 Decode
  2. Decode 出来的数据,根据不同的协议,回调到 stream 层,进行 stream 的创建和封装
  3. stream 创建完毕后,会回调到 Proxy 层做路由和转发,Proxy 层会关联上下游(upstream,downstream)间的转发关系
  4. Proxy 挑选到后端后,会根据后端使用的协议,将数据发送到对应协议的 Protocol 层,对数据重新做 Encode
  5. Encode 后的数据会发经过 write filter 并最终使用 IO 的 write 发送出去

MOSN 源码解析 - 协程模型

数据接收

network 层 读取

func (c *connection) startReadLoop() {
	for {
		select {
		case <-c.readEnabledChan:
		default:
			//真正的读取数据逻辑在这里
			err := c.doRead()
		}
	}
}
func (c *connection) doRead() (err error) {
    //为该连接创建一个buffer来保存读入的数据
	//从连接中读取数据,返回实际读取到的字节数,rawConnection对应的就是原始连接
	bytesRead, err = c.readBuffer.ReadOnce(c.rawConnection)
	//通知上层读取到了新的数据
	c.onRead()
}
func (c *connection) onRead() {
	//filterManager过滤器管理者,把读取到的数据交给过滤器链路进行处理
	c.filterManager.OnRead()
}
func (fm *filterManager) onContinueReading(filter *activeReadFilter) {
	//这里可以清楚的看到网络层读取到数据以后,通过filterManager把数据交给整个过滤器链路处理
	for ; index < len(fm.upstreamFilters); index++ {
		uf = fm.upstreamFilters[index]
        //针对还没有初始化的过滤器回调其初始化方法OnNewConnection
		buf := fm.conn.GetReadBuffer()	
        //通知过滤器进行处理
        status := uf.filter.OnData(buf)
        if status == api.Stop {
            return
        }
	}
}

network 层的read filter 过滤器对应的实现就在proxy.go文件中

func (p *proxy) OnData(buf buffer.IoBuffer) api.FilterStatus {
    //针对使用的协议类型初始化serverStreamConn
	if p.serverStreamConn == nil {
		protocol, err := stream.SelectStreamFactoryProtocol(p.context, prot, buf.Bytes())
		p.serverStreamConn = stream.CreateServerStreamConnection(p.context, protocol, p.readCallbacks.Connection(), p)
	}
	//把数据分发到对应协议的的解码器
	p.serverStreamConn.Dispatch(buf)
}

之后streamConnection/stream 等逻辑就是 根据协议的不同而不同了(初次看代码时踩了很大的坑),下面代码以xprotocol 协议为例

func (sc *streamConn) Dispatch(buf types.IoBuffer) {
	for {
		// 针对读取到的数据,按照协议类型进行解码
		frame, err := sc.protocol.Decode(streamCtx, buf)
		// No enough data
		//如果没有报错且没有解析成功,那就说明当前收到的数据不够解码,退出循环,等待更多数据到来
		if frame == nil && err == nil {
			return
		}
		//解码成功以后,开始处理该请求
		sc.handleFrame(streamCtx, xframe)
	}
}
func (sc *streamConn) handleFrame(ctx context.Context, frame xprotocol.XFrame) {
	switch frame.GetStreamType() {
	case xprotocol.Request:
		sc.handleRequest(ctx, frame, false)
	case xprotocol.RequestOneWay:
		sc.handleRequest(ctx, frame, true)
	case xprotocol.Response:
		sc.handleResponse(ctx, frame)
	}
}
func (sc *streamConn) handleRequest(ctx context.Context, frame xprotocol.XFrame, oneway bool) {
	// 1. heartbeat process
	if frame.IsHeartbeatFrame() {...}
	// 2. goaway process
	if ...{...}
	// 3. create server stream
	serverStream := sc.newServerStream(ctx, frame)
	// 4. tracer support
	// 5. inject service info
    // 6. receiver callback
    serverStream.receiver = sc.serverCallbacks.NewStreamDetect(serverStream.ctx, sender, span)
	serverStream.receiver.OnReceive(serverStream.ctx, frame.GetHeader(), frame.GetData(), nil)
}

mosn 数据接收时,从proxy.onData 收到传上来的数据,执行对应协议的serverStreamConnection.Dispatch 经过协议解析, 字节流转成了协议的数据包,转给了StreamReceiveListener.OnReceive。proxy.downStream 实现了 StreamReceiveListener。

MOSN 多协议机制解析 协议解析过程比较复杂,涉及到多路复用等机制,具体细节参见mosn细节

转发流程

在http/http2/xprotocol 对应的ServerStreamConnection 中,每次收到一个新的Stream,sc.serverCallbacks.NewStreamDetect ==> newActiveStream 生成一个新的downStream struct 对应, sc.serverCallbacks 其实就是proxy struct。downStream 代码注释中提到: Downstream stream, as a controller to handle downstream and upstream proxy flow。 downStream 同时持有responseSender 成员指向Stream,用于upstream收到响应数据时 回传给client。

downStream.OnReceive 逻辑

func (s *downStream) OnReceive(ctx context.Context,..., data types.IoBuffer, ...) {
    ...
    //把给任务丢给协程池进行处理即可
    pool.ScheduleAuto(func() {
        phase := types.InitPhase
        for i := 0; i < 10; i++ {
            ...
            phase = s.receive(ctx, id, phase)
            ...
        }
    }
}

downStream.receive 总体来说在请求转发阶段,依次需要经过DownFilter -> MatchRoute -> DownFilterAfterRoute -> DownRecvHeader -> DownRecvData -> DownRecvTrailer -> WaitNofity这么几个阶段。

  1. DownFilter, mosn 的配置文件config.json 中的Listener 配置包含 stream filter 配置,就是在此处被使用

     "listeners":[
         {
             "name":"",
             "address":"", ## Listener 监听的地址
             "filter_chains":[],  ##  MOSN 仅支持一个 filter_chain
             "stream_filters":[], ## 一组 stream_filter 配置,目前只在 filter_chain 中配置了 filter 包含 proxy 时生效
         }
     ]
    
  2. MatchRoute,一个请求所属的domains 绑定了许多路由规则,目的将一个请求 路由到一个cluster 上
  3. ChooseHost,每一个cluster 对应一个连接池
  4. WaitNofity则是转发成功以后,等待被响应数据唤醒。
func (s *downStream) receive(ctx context.Context, id uint32, phase types.Phase) types.Phase {
    for i := 0; i <= int(types.End-types.InitPhase); i++ {
        switch phase {
        // init phase
        case types.InitPhase:
            phase++
        // downstream filter before route
        case types.DownFilter:
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)
            phase++
        // match route
        case types.MatchRoute:
            //生成服务提供者的地址列表以及路由规则
            s.matchRoute()
            phase++
        // downstream filter after route
        case types.DownFilterAfterRoute:
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)
            phase++
        // downstream receive header
        case types.DownRecvHeader:
            //这里开始依次发送数据
            s.receiveHeaders(s.downstreamReqDataBuf == nil && s.downstreamReqTrailers == nil)
            phase++
        // downstream receive data
        case types.DownRecvData:
            //check not null
            s.receiveData(s.downstreamReqTrailers == nil)
            phase++
        // downstream receive trailer
        case types.DownRecvTrailer:
            s.receiveTrailers()
            phase++
        // downstream oneway
        case types.Oneway:
            ...
        case types.Retry:
            ...
            phase++
        case types.WaitNofity:
            //这里阻塞等待返回及结果
            if p, err := s.waitNotify(id); err != nil {
				return p
            }
            phase++
        // upstream filter
        case types.UpFilter:
            s.runAppendFilters(phase, s.downstreamRespHeaders, s.downstreamRespDataBuf, s.downstreamRespTrailers)
            // maybe direct response
            phase++
        // upstream receive header
        case types.UpRecvHeader:
            // send downstream response
            // check not null
            s.upstreamRequest.receiveHeaders(s.downstreamRespDataBuf == nil && s.downstreamRespTrailers == nil)
            phase++
        // upstream receive data
        case types.UpRecvData:
            // check not null
            s.upstreamRequest.receiveData(s.downstreamRespTrailers == nil)
            phase++
        // upstream receive triler
        case types.UpRecvTrailer:
            //check not null
            s.upstreamRequest.receiveTrailers()
            phase++
        // process end
        case types.End:
            return types.End
        default:
            return types.End
        }
    }
    return types.End
}

数据发送

真正的发送数据逻辑是在receiveHeaders、receiveData、receiveTrailers这三个方法里,当然每次请求不一定都需要有这三部分的数据,这里我们以receiveHeaders方法为例来进行说明:

func (s *downStream) receiveHeaders(endStream bool) {
	s.downstreamRecvDone = endStream
    ...
	clusterName := s.route.RouteRule().ClusterName()
	s.cluster = s.snapshot.ClusterInfo()
	s.requestInfo.SetRouteEntry(s.route.RouteRule())
	//初始化连接池
	pool, err := s.initializeUpstreamConnectionPool(s)
	parseProxyTimeout(&s.timeout, s.route, s.downstreamReqHeaders)
	prot := s.getUpstreamProtocol()
	s.retryState = newRetryState(s.route.RouteRule().Policy().RetryPolicy(), s.downstreamReqHeaders, s.cluster, prot)
	//构建对应的upstream请求
	proxyBuffers := proxyBuffersByContext(s.context)
	s.upstreamRequest = &proxyBuffers.request
	s.upstreamRequest.downStream = s
	s.upstreamRequest.proxy = s.proxy
	s.upstreamRequest.protocol = prot
	s.upstreamRequest.connPool = pool
	s.route.RouteRule().FinalizeRequestHeaders(s.downstreamReqHeaders, s.requestInfo)
	//这里发送数据
	s.upstreamRequest.appendHeaders(endStream)
	//这里开启超时计时器
	if endStream {
		s.onUpstreamRequestSent()
	}
}
func (r *upstreamRequest) appendHeaders(endStream bool) {
	... 
	r.connPool.NewStream(r.downStream.context, r, r)
}

与一个 downStream struct 对应的是upstreamRequest ,倒不算一对一关系。downStream 聚合一个upstreamRequest 成员,从bufferPool(本质是go的对象池sync.Pool)取出一个成员赋给 downStream.upstreamRequest,结束后会调用 downStream.cleanStream 回收。

connPool也是每个协议 不同的,以xprotocol 为例

func (p *connPool) NewStream(ctx context.Context, responseDecoder types.StreamReceiveListener, listener types.PoolEventListener) {
	subProtocol := getSubProtocol(ctx)
    //从连接池中获取连接
	client, _ := p.activeClients.Load(subProtocol)
	activeClient := client.(*activeClient)
    var streamEncoder types.StreamSender
    //这里会把streamId对应的stream保存起来
    streamEncoder = activeClient.client.NewStream(ctx, responseDecoder)
    streamEncoder.GetStream().AddEventListener(activeClient)
    //发送数据
    listener.OnReady(streamEncoder, p.host)
}

从 xprotocol.connPool 取出一个client ,创建了一个协议对应的 Stream(变量名为 streamEncoder),对xprotocol 就是xStream,最终执行了AppendHeaders

func (r *upstreamRequest) OnReady(sender types.StreamSender, host types.Host) {
	r.requestSender = sender
	r.host = host
	r.requestSender.GetStream().AddEventListener(r)
	r.startTime = time.Now()

	endStream := r.sendComplete && !r.dataSent && !r.trailerSent
    //发送数据
	r.requestSender.AppendHeaders(r.downStream.context, r.convertHeader(r.downStream.downstreamReqHeaders), endStream)

	r.downStream.requestInfo.OnUpstreamHostSelected(host)
	r.downStream.requestInfo.SetUpstreamLocalAddress(host.AddressString())
}
func (s *xStream) AppendHeaders(ctx context.Context, headers types.HeaderMap, endStream bool) (err error) {
	// type assertion
	// hijack process
    s.frame = frame
	// endStream
	if endStream {
		s.endStream()
	}
	return
}
func (s *xStream) endStream() {
    // replace requestID
    s.frame.SetRequestId(s.id)
    // remove injected headers
    buf, err := s.sc.protocol.Encode(s.ctx, s.frame)
    err = s.sc.netConn.Write(buf)
}

网络层的write

func (c *connection) Write(buffers ...buffer.IoBuffer) (err error) {
    //同样经过过滤器
	fs := c.filterManager.OnWrite(buffers)
	if fs == api.Stop {
		return nil
	}
	if !UseNetpollMode {
		if c.useWriteLoop {
			c.writeBufferChan <- &buffers
		} else {
			err = c.writeDirectly(&buffers)
		}
	} else {
		//netpoll模式写
	}
	return
}
//在对应的eventloop.go中的startWriteLoop方法:
func (c *connection) startWriteLoop() {
	var err error
	for {
		select {
		case <-c.internalStopChan:
			return
		case buf, ok := <-c.writeBufferChan:
			c.appendBuffer(buf)
            c.rawConnection.SetWriteDeadline(time.Now().Add(types.DefaultConnWriteTimeout))
			_, err = c.doWrite()
		}
	}
}

请求数据发出去以后当前协程就阻塞了,看下waitNotify方法的实现:

func (s *downStream) waitNotify(id uint32) (phase types.Phase, err error) {
	if s.ID != id {
		return types.End, types.ErrExit
	}
	//阻塞等待
	select {
	case <-s.notify:
	}
	return s.processError(id)
}

与envoy 对比

envoy 对应逻辑 深入解读Service Mesh的数据面Envoy

深入解读Service Mesh的数据面Envoy下文以envoy 实现做一下类比 用来辅助理解mosn 相关代码的理念:

对于每一个Filter,都调用onData函数,咱们上面解析过,其中HTTP对应的ReadFilter是ConnectionManagerImpl,因而调用ConnectionManagerImpl::onData函数。ConnectionManager 是协议插件的处理入口,同时也负责对整个处理过程的流程编排

补充细节

不同颜色 表示所处的 package 不同

一次http1协议请求的处理过程(绿色部分表示另起一个协程)

代码的组织(pkg/stream,pkg/protocol,pkg/proxy) 跟架构是一致的

  1. pkg/types/connection.go Connection
  2. pkg/types/stream.go StreamConnection is a connection runs multiple streams
  3. pkg/types/stream.go Stream is a generic protocol stream
  4. 一堆listener 和filter 比较好理解:Method in listener will be called on event occur, but not effect the control flow.Filters are called on event occurs, it also returns a status to effect control flow. Currently 2 states are used: Continue to let it go, Stop to stop the control flow.

学到的

不要硬看代码,尤其对于多协程程序

  1. 打印日志
  2. debug.printStack 来查看某一个方法之前的调用栈
  3. fmt.Printf("==> %T\n",xx) 如果一个interface 有多个“实现类” 可以通过%T 查看struct 的类型