技术

agentic chat 图数据库的一些考量 LLM一些探索 Agent实践 LLM预训练 向量数据库的一些考量 fastapi+sqlalchemy进行项目开发 LLM微调实践 Python协程实现 Agent Functon Calling LLamaIndex入门 Multi-Agent探索 Python虚拟机 LLM工作流编排 Python实践 下一个平台Agent 激发LLM涌现——提示工程 LLM微调理论 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Python一些比较有意思的库 Transformers源码学习 LangChain源码学习 通用分布式计算引擎Ray Python并发 go依赖注入 go collection gc的基本原理 golang性能分析及优化 数据湖 高性能计算与存储 Linux2.1.13网络源代码学习 《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器以及defer实现 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go堆内存分配 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 Kubernetes 控制器模型 容器日志采集 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 线程排队 jib源码分析之细节 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 递归、回溯、动态规划 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型和jvm内存布局 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

架构

bert rerank微调 大模型推理tips RAG向量检索与微调 dddfirework源码分析 RAG与知识图谱 大模型推理服务框架vLLM 大模型推理服务框架 模型服务化(未完成) 大模型Post-Training 大模型训练 大模型推理 从Attention到Transformer k8s设备管理 ddd从理念到代码 如何应用LLM 小鼠如何驾驭大象(LLM)? 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 AutoML和AutoDL 特征平台 实时训练 分布式链路追踪 K8S YAML 资源清单管理方案 tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 推荐系统embedding原理及实践 机器学习中的python调用c 机器学习训练框架概述 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从混部到统一调度 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tensorflow学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Admission Controller 与 Admission Webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI——kubelet与容器引擎之间的接口 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 概率论 serverless 泛谈 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 用户认证问题 资源的分配与回收——池 消息/任务队列

标签

k8s设备管理 多类型负载协调员Koordinator controller-runtime细节分析 finops学习 kubevela多集群 kubevela中cue的应用 基于k8s的工作流 kubevela源码分析 容器和CPU那些事儿 数据集管理fluid 应用管理平台kubevela karmada支持crd 多集群管理 K8S YAML 资源清单管理方案 从混部到统一调度 volcano特性源码分析 kubebuilder 学习 client-go学习 tf-operator源码分析 k8s批处理调度/Job调度 喜马拉雅容器化实践 Kubernetes 实践 openkruise学习 基于Kubernetes选主及应用 Admission Controller 与 Admission Webhook k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 深入controller openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go informer源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? Kubernetes events学习及应用 CRI——kubelet与容器引擎之间的接口 资源调度泛谈 如何学习Kubernetes 以应用为中心 kubernetes operator kubernetes扩缩容 serverless 泛谈 什么是云原生 自定义CNI IPAM docker和k8s安全访问机制 Kubernetes监控 Kubernetes 控制器模型 Kubernetes资源调度——scheduler Kubernetes类型系统 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 Kubernetes源码分析——从kubectl开始 kubernetes yaml配置 CNI——容器网络是如何打通的 当我在说PaaS时,我在说什么 《深入剖析kubernetes》笔记 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件
agentic chat bert rerank微调 大模型推理tips LLM一些探索 Agent实践 LLM预训练 RAG向量检索与微调 LLM微调实践 RAG与知识图谱 大模型推理服务框架vLLM Agent Functon Calling LLamaIndex入门 Multi-Agent探索 LLM工作流编排 大模型推理服务框架 模型服务化(未完成) 大模型Post-Training 大模型训练 大模型推理 从Attention到Transformer 下一个平台Agent 激发LLM涌现——提示工程 LLM微调理论 大佬沉思 LLM外挂知识库 LLMOps 多模态LLM Transformers源码学习 LangChain源码学习 如何应用LLM 小鼠如何驾驭大象(LLM)? AutoML和AutoDL 特征平台 实时训练 tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 推荐系统embedding原理及实践 机器学习中的python调用c 机器学习训练框架概述 tensornet源码分析 大模型训练和推理 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 从RNN到Attention pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 推理服务 mpi 学习pytorch 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tensorflow学习 kaggle泰坦尼克问题实践 神经网络模型优化 概率论 直觉上理解深度学习 如何学习机器学习 深度学习泛谈

通用分布式计算引擎Ray

2023年08月23日

前言

一个应用场景: 笔者在做一个知识库应用,涉及到对文档的解析(一般用到gpu)、切片、向量化、存到vdb,抽象一下,一个workflow涉及到多个step,有的step很快有的很慢,有的用到cpu有的用到gpu,某个step出错了要支持重试,大批量文件时,可以自动扩展集群规模,提高实例速度。此时有几种实现方式

  1. 启动多个进程,一个进程干所有活儿,一个函数实现实现所有过程,一个子函数一个step,每个子函数配置重试策略(此时复函数无需重试)
  2. 启动多个进程,一个进程干一个step,进程之间通过mq/db 传递信息。相对与1的好处是,部分step可以指定使用gpu。缺点是代码开发、部署麻烦(多个进程嘛)。 怎么兼得?代码写在一起,一个step一个函数,指定有的step 用cpu,有的用gpu。代码回归到dsl 的本质,而处理方式则可以根据集群规模、资源诉求任意扩展。

Ray 是一个并行和分布式 Python 的开源库。其定位是一个通用的分布式编程框架,提供了统一的分布式底盘,能帮助用户将自己的程序快速分布式化。从高层次上看,Ray 生态系统由三部分组成:。Ray Core 提供了 low level 的分布式语法,如 remote func、remote class; Ray AIR 提供了 AI 场景的相关库(包括原生库和第三方库),以及用于在任何集群或云提供商上启动集群的工具。

如何用 Python 实现分布式计算?Ray 是基于 Python 的分布式计算框架,采用动态图计算模型,提供简单、通用的 API 来创建分布式应用。使用起来很方便,你可以通过装饰器的方式,仅需修改极少的的代码,让原本运行在单机的 Python 代码轻松实现分布式计算,目前多用于机器学习。

  1. 提供用于构建和运行分布式应用程序的简单原语。
  2. 使用户能够并行化单机代码,代码更改很少甚至为零。
  3. Ray Core 包括一个由应用程序、库和工具组成的大型生态系统,以支持复杂的应用程序。比如 Tune、RLlib、RaySGD、Serve、Datasets、Workflows。

将python分布式+并行化

# 一个装饰器就搞定分布式计算
ray.init()  # 在本地启动 ray,如果想指定已有集群,在 init 方法中指定 RedisServer 即可
@ray.remote   # 声明了一个 remote function,是 Ray 的基本任务调度单元,它在定义后,会被立马序列化存储到 RedisServer 中,并且分配一个唯一的 ID,这样就能保证集群所有节点都能看到这个函数的定义;
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))  # [0, 1, 4, 9]    可以通过 ObjectID 获取 ObjectStore 内的对象并将之转换为 Python 对象,这个方法是阻塞的,会等到结果返回;

装饰器 @ray.remote 也可以装饰一个类:

import ray
ray.init()

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0
    def increment(self):
        self.n += 1
    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
tmp1 = [c.increment.remote() for c in counters]
tmp2 = [c.increment.remote() for c in counters]
tmp3 = [c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [3, 3, 3, 3]

为什么

分布式计算框架Ray介绍当我们要构建一个涉及大规模数据处理或者复杂计算的应用,传统的方式是使用现成的大数据框架,例如 Apache Flink 和 Apache Spark。这些系统提供的API通常基于某种特定的计算范式(例如DataStream、DataSet),要求用户基于这些特定的计算范式实现应用逻辑。对于传统的数据清洗、数据分析等应用,这种用法能够很好地适用。但是,随着分布式应用的逻辑越来越复杂(例如分布式机器学习应用),许多应用的逻辑并不能直接套用现有的计算范式。在这种情况下,开发者如果想要细粒度地控制系统中的任务流,就需要自己从头编写一个分布式应用但是现实中,开发一个分布式应用并不简单。除了应用本身的代码逻辑,我们还需要处理许多分布式系统中常见的难题,例如:分布式组件通信、服务部署、服务发现、监控、异常恢复等。处理这些问题,通常需要开发者在分布式系统领域有较深的经验,否则很难保证系统的性能和健壮性。为了简化分布式编程,Ray提供了一套简单、通用的分布式编程API,屏蔽了分布式系统中的这些常见的难题,让开发者能够使用像开发单机程序一样简单的方式,开发分布式系统。

云原生场景下如何利用Ray快速构建分布式系统Ray 的通用性体现在哪里呢?Ray的设计思想是不绑定任何计算模式,把单机编程中的基本概念分布式化。从API 的设计可以看出,Ray并不是一个大数据系统,尤其是Ray Core这一层没有任何大数据相关的算子,而是从单机编程的基本概念进行分布式化的。具体如何分布式化?我们在单机编程中经常用到两个非常核心的概念,一个叫Function,一个叫Class,在面性对象的编程语言里面,基本上大家会围绕这两个概念进行代码开发,在Ray中会将这两个基本概念进行分布式化,对应到分布式系统就叫Task和Actor

设计

Ray 的框架中最为重要的两个部分是 Ray Core 和 Ray AIR:

  1. Ray Core 是底层的分布式的计算框架,使用基于 actor 模型来实现的一套计算框架,它可以将 Python 的一个 Class 或者一个 Function 转成分布式的 actor 和 task,在所有的机器上分布式地进行运行,并且 tasks/actor 之间可以通过分布式的对象存储能力来达到共享的能力。基于这层api,可以非常容易将现有程序分布式化,这一层没有绑定任何计算模式以及语义。
  2. Ray AIR是Ray的上层API,主要将Ray的各种应用整合在一起。比如 xgboost-ray/RayTrain/RayServe

Ray 提供了 init、remote、put、get 等基本原语。但是通过这些基本原语达不到更细粒度的控制,例如针对不同的计算配置不同的CPU、内存。或者调度的时候,能够提供人工设定调度的方法。Ray 希望通过参数配置来实现任务、故障和应用的细粒度控制。配置能够达到什么样粒度的控制。

Ray 寻找能够在分布式系统上开发的一种通用方法。Ray API 能够让开发者容易的组合在一个分布式框架上组合不同类型的库。举个例子,Ray 任务和 Actor 可以在分布式训练(例如,torch.distributed)里面调用或者被调用。在这种情况下,Ray 设计为一个分布式的粘合系统,因为它的 API 是通用的,足够支持在不同的工作类型中作为界面层服务

  1. Ray 架构的核心原则是 API 的简单和通用,而核心系统的目标是性能(低开销和横向扩展)和可靠性。有时候,我们愿意牺牲也挺好的目的。举个例子,Ray 包含了一些组件,例如,分布式引用计数和分布式内存,它增加了架构的复杂度,但是对于性能和可靠性来说这是必须的
  2. 为了性能,Ray 基于 gRPC 构建,并且能够在很多场景下匹配甚至超过原生 gRPC 的性能。相对于 gRPC 本身,Ray 让应用平衡并行和分布式操作,和分布式内存共享(通过一个共享的对象存储来实现)更简单
  3. 为了可靠性,Ray 的内部协议设计提供了发生故障的时候的纠错性,同时减少了通用情况下的开销。Ray 开发了一个分布式引用计数协议来保障内存安全,并且提供了故障恢复的不同选项

通用分布式编程API:无状态计算单元Task

Task是对单机编程中的Function进行分布式化,是一个无状态的计算单元。Ray可以把一个任意的Python函数或Java静态方法转换成一个Task,在和调用程序不同的进程上的单个的函数调用。在这个过程中,Ray会在一个远程进程中执行这个函数。并且这个过程是异步的,这意味着我们可以并行地执行多个Task。PS:和python线程、进程的异步执行很像。

# `square`是一个普通的Python函数,`@ray.remote`装饰器表示我们可以把这个函数转化成Ray task,可以远程执行
@ray.remote
def square(x):
    return x * x
obj_refs = []
# `square.remote` 会异步地远程执行`square`函数(该function就会被调度到远程节点的某个进程中执行)。通过下面两行代码,我们并发地执行了5个Ray task。`square.remote`的返回值是一个`ObjectRef`对象,表示Task执行结果的引用。
for i in range(5):
    obj_refs.append(square.remote(i))
# 实际的task执行结果存放在Ray的分布式object store里,我们可以通过`ray.get`接口,同步地获取这些数据。
assert ray.get(obj_refs) == [0, 1, 4, 9, 16]

通用分布式编程API:分布式object

Obect Store是Ray架构中的一个关键组件,Task计算的中间结果会存放在分布式Object Store中。除此之外,我们也可以使用put接口,显式地把一个Python或Java对象存放到Object Store中。

我们在Node 1运行heavy_compute function,这个 function 会使用remote通过Ray底层的调度系统调度到Node 2, Node 2会执行这个function,执行完成后,把结果put到本地的object store中,object store 是Ray中的一个核心组件,最终结果返回到Caller端是通过Ray底层的 object store之间的object传输,把结果返回来给Caller端。

从整个的流程看, heavy_compute.remote 返回的是一个ObjectRef,并不是最终的结果。ObjectRef类似于单机编程中的future,只不过它是分布式的future,可以通过ray.get获取最终结果。

Ray的分布式 object store是非常核心的组件,完美支撑了Ray整套分布式API 的设计,其特点如下:

  1. 可以实现多节点之间object 传输;
  2. 同节点内是基于shared memory的设计,在此基础上,分布式系统的online传输,如果发生在单机两个进程之间的话,理论上可以达到 Zero Copy 的效果;
  3. Ray object store 有一套比较完整的自动垃圾回收机制,可以保证分布式系统运算过程中一旦有ObjectRef在系统中没有引用的时候,会触发对该object 进行GC;
  4. Object store有object spilling 的功能,可以自动将内存中的object spill到磁盘上,从而达到扩容整个分布式系统存储的目的。 PS: 分布式计算的核心就是状态中心、任务的分发与收集。

通用分布式编程API:有状态计算单元Actor

Actor将单机编程的Class概念进行分布式化。Ray使用Actor来表示一个有状态的计算单元。在Ray中,我们可以基于任意一个Python class或Java class创建Actor对象。这个Actor对象运行在一个远程的Python或者Java进程中。同时,我们可以通过ActorHandle远程调用这个Actor的任意一个方法(每次调用称为一个Actor Task),多个Actor Task在Actor进程中会顺序执行,并共享Actor的状态。

# `Counter`是一个普通的Python类,`@ray.remote`装饰器表示我们可以把这个类转化成Ray actor.
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0
    def increment(self):
        self.value += 1
    def get_value(self):
        return self.value
# `Counter.remote`会基于`Counter`类创建一个actor对象,这个actor对象会运行在一个远程的Python进程中。在另外一台机器的另外一个节点上面去实例化这个class。
counter = Counter.remote()
# `Counter.remote`的返回值是一个`ActorHandle`对象。通过`ActorHandle`,我们可以远程调用Actor的任意一个方法(actor task)。通过.remote实现远程调用。
[counter.increment.remote() for _ in range(5)]
# Actor task的返回值也是一个`ObjectRef`对象。同样地,我们通过`ray.get`获取实际的数据。
assert ray.get(counter.get_value.remote()) == 5

其它

在Ray中,我们可以把一个Task输出的ObjectRef传递给另一个Task(包括Actor task)。在这种情况下,Ray会等待第一个Task执行结束之后,再开始执行第二个Task。同时,我们也可以把一个ActorHandle传递给一个Task,从而实现在多个远程Worker中同时远程调用一个Actor。通过这些方式,我们可以动态地、灵活地构建各种各样复杂的分布式任务流。

# 通过把一个task输出的`ObjectRef`传递给另一个task,我们定义了两个task的依赖关系。Ray会等待第一个task执行结束之后,再开始执行第二个task。
obj1 = square.remote(2)
obj2 = square.remote(obj1)
assert ray.get(obj2) == 16

除了Task和Actor两个基本概念,Ray还提供了一系列高级功能,来帮助开发者更容易地开发分布式应用。这些高级功能包括但不限于:设置Task和Actor所需的资源、Actor生命周期管理、Actor自动故障恢复、自定义调度策略、Python/Java跨语言编程。

如果没有Ray,在纯云原生的实现思路中,资源定制是写到 yaml 里边的。比如说训练需要多少GPU 或者计算节点需要多少CPU,都是在 yaml 中定制 container 的规格。Ray提供了另外一个选择,完全无感知的代码化的配置,用户可以在 runtime 的时候,或者在Ray的Task 或 Actor 的decorator 中加一个参数,就可以通过Ray系统的调度能力分配相应的资源,达到整个分布式系统资源定制的目的。Ray的资源定制除了支持GPU、CPU、Memory 之外,还可以插入自定义资源。然后Ray的调度还有一些高级功能,比如资源组,或者亲和性和反亲和性的调度,目前都是支持的。ray的调度中,所有的资源都是逻辑资源不是物理资源,ray会按照task/actor的资源需求来进行调度,但不会限制task/actor对资源的真实使用,没有资源隔离。比如一个节点有10cpus,actor(num_cpus=2)只能创建5个,每个actor使用多少线程/进程ray是不控制的。另外cpu/gpu可以是小数,但不代表ray会为进程做cpu/gpu共享。

@ray.remote(num_gpus=1)
def train_and_evaluate(model,train_indices,test_indices):
	...

依赖管理:在分布式系统中,往往不同分布式系统的组件对环境的要求是不一样的。如果使用常规思路,就需要把环境固化到image里面,通过 Dockerfile 去定制环境。Ray实现了更灵活的选择,也是代码化的,可以在runtime创建Task或Actor之前的任意时刻定制指定计算单元的运行时环境。上图中给worker 的 Task 设定一个runtime_env,定制一个专属的Python版本,并在该版本里面装入一些pip包,完成面向Python的隔离环境的定制。这时Ray集群内部会在创建这个Task之前去准备该环境,然后将该Task调度到该环境执行。

@ray.remote(runtime_env={"python_version":"3.9","pip"=["scikit-learn"]})
def train_and_evaluate(model,train_indices,test_indices):
	...

Ray的运行时环境是插件化的设计,用户可以根据自己的需求实现不同的插件,在Ray中原生支持了一些插件如Pip、Conda、Container等,只要是跟环境相关,不只是代码依赖,也可以是数据依赖,都可以通过插件去实现。

Ray中用户可以根据自己的环境定制的需求选择需要定制的环境的粒度。以Python为例,隔离性的支持力度有如下几个维度,一个是 Process 级别的隔离,第二是 Virtual env 级别的隔离,第三是 Conda 级别的隔离,最后是 Container级别隔离。从隔离性来说,从右到左是由弱到强的,Process 的隔离性是非常弱的,Container 隔离性是更强的。从用户体验来说,环境定制上 Container 是更重的而Process 是更轻的。

架构

整体设计

基于 Ray 的大规模离线推理 Ray 项目是 UC Berkeley 的 RISElab 实验室在 2017 年前后发起的,定位是通用的分布式编程框架——Python-first。理论上通过 Ray 引擎用户可以轻松地把任何 Python 应用做成分布式,尤其是机器学习的相关应用,目前 Ray 主攻的一个方向就是机器学习。Ray 的架构分为三层

  1. 最下面一层是各种云基础设施,也就是说 Ray 帮用户屏蔽了底层的基础设施,用户拉起一个 Ray Cluster之后就可以立即开始分布式的编程,不用考虑底层的云原生或各种各样的环境;
  2. 中间层是 Ray Core 层。这一层是 Ray 提供的核心基础能力,主要是提供了 Low-level 的非常简洁的分布式编程 API。基于这套 API,用户可以非常容易地把现有的 Python 的程序分布式化。值得注意的是,这一层的 API 是 Low-level,没有绑定任何的计算范式,非常通用;
  3. 最上层是 Ray 社区基于 Ray Core 层做的丰富的机器学习库,这一层的定位是做机器学习 Pipeline。比如,数据加工读取、模型训练、超参优化、推理,强化学习等,都可以直接使用这些库来完成整个的 Pipeline,这也是 Ray 社区目前主攻的一个方向。

上图展示的是 Ray Cluster 的基本架构,每一个大框就是一个节点。(这里的节点是一个虚拟的概念,可以是一个物理机,一个 VM 或一个 Linux 的 Docker。比如在 K8s 上,一个节点就是一个 Pod。)

  1. Head 节点:是 Ray Cluster 的调度中心,比较核心的组件是 GCS(Global Control Service),GCS 储存了代码、输入参数、返回值,一些元数据比如actor的地址,负责整个集群的资源调度和节点管理,类似于Hadoop架构中Yarn里边的 Resource Manager。Head节点也有可观测性 Dashboard。
  2. Worker 节点:除了 Head 节点之外,其他都是 Worker 节点,承载具体的工作负载。
    1. Raylet:每个节点上面都有一个守护进程 Raylet,它是一个 Local Scheduler,负责Server内部的调度: Task 的调度以及 Worker 的管理。
    2. Object Store 组件:每个节点上都有 Object Store 组件,负责节点之间 Object 传输。在整个 Cluster 中每个节点的 Object Store 组件组成一个全局的分布式内存。同时,在单个节点上,Object Store 在多进程之间通过共享内存的方式减少 copy。
    3. 其它: 集中式的管理能够提供很好的调度监控,但是也会影响性能。GCS 希望能够较少被 worker 访问,因此采用了一些 Local Scheduler 的方式,避免 worker 对 GCS 的频繁调用。
  3. Driver:当用户向 Ray Cluster 上提交一个 Job,或者用 Notebook 连接的时候,Ray挑选节点来运行 Driver 进行,执行用户代码。作业结束后 Driver 销毁。
  4. Worker:是 Ray 中 Task 和 Actor 的载体。

Ray 的Low-level和 High-level API

在部署 Ray 时,开源社区有完整的解决方案 Kuberay 项目。每个 Ray Cluster 由 Head 节点和 Worker 节点组成,每个节点是一份计算资源,可以是物理机、Docker 等等,在 K8s 上即为一个 Pod。启动 Ray Cluster 时,使用 Kuberay 的 Operator 来管理整个生命周期,包括创建和销毁 Cluster 等等。Kuberay 同时也支持自动扩展和水平扩展。Ray Cluster 在内部用于收集负载的 Metrics,并根据 Metrics 决定是否扩充更多的资源,如果需要则触发 Kuberay 拉起新的 Pod 或删除闲置的 Pod。

用户可以通过内部的平台使用 Ray,通过提交 Job 或使用 Notebook 进行交互式编程。平台通过 Kuberay 提供的 YAML 和 Restful API 这两种方式进行操作。

ray 如何控制并发度?Ray 的调度器会动态地根据当前系统资源和任务负载情况来决定启动几个 raylet 进程。Ray 的设计目标之一是能够自适应地处理不同规模和负载的工作负载。 工作负载可以通过@ray.remote 指定 运行它所需要的cpu/gpu等资源。 PS: 不一定对。

Ray 如何远程执行 Python 代码

PS: 对于rpc 来说,client.func() 底层持有net_client 将func_name/func_arg 等传给server侧,而server 根据func_name/func_arg运行方法并返回结果。ray 则更进一步,server 无需事先持有func 代码,clinet.func() 实质是net_client将方法定义 传输给server 由server 执行并拿到返回值。 Ray 如何远程执行 Python 代码f 是用 python 定义的一个函数,Ray 本身是基于 C++ 实现的分布式计算。那怎么把 python 的代码 remote 到 C++ 的runtime 来执行呢?

import ray
import time
ray.init()

@ray.remote
def f(i):
    time.sleep(1)
    return i

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

GCS 是 Global State Service,管理全局的状,Header 和 Worker 的不同点就在于 GCS。Raylet 是一个连接器,把 Worker (Header 也是一种 Worker) 都连接起来。Head node 有一个 Driver,支持 Python 和 Java。怎么理解支持 Python 和 Java(后面主要说 Python)。就是提供上面代码的样子,支持 import ray 以及后面 ray.init 和 remote 注解的方式来提交计算到各个节点,Raylet 在这个过程主要存储,以及和 GCS 通讯,传递计算状态和信息。Python 调用封装好的 C++ 库来和其他 Node 通讯。

Remote 注解在 remote_function.py 里面,简化如下(… 代表省略内容):

def _remote(...):
  # ...
  # There is an interesting question here. If the remote function is
  # used by a subsequent driver (in the same script), should the
  # second driver pickle the function again? If yes, then the remote
  # function definition can differ in the second driver (e.g., if
  # variables in its closure have changed). We probably want the
  # behavior of the remote function in the second driver to be
  # independent of whether or not the function was invoked by the
  # first driver. This is an argument for repickling the function,
  # which we do here.
  self._pickled_function = pickle.dumps(self._function)

  self._function_descriptor = PythonFunctionDescriptor.from_function(
    self._function, self._pickled_function
  # ...

首先把 pickled_function 用 pickle 来序列化。然后用 PythonFunctionDescriptor 来打包前后两个函数,PythonFunctionDescriptor 是用 cython 实现的的对 python 函数的封装,如下:

def from_function(cls, function, pickled_function):
  # ...
  module_name = function.__module__
  function_name = function.__name__
  class_name = "

  pickled_function_hash = hashlib.sha1(pickled_function).hexdigest()
  return cls(module_name, function_name, class_name, pickled_function_hash)

这里用 hash 把函数内容封装,然后调用 f.remote() 提交这个计算任务。在 remote_function.py 的 _remote 函数里的 invocation ,如下:

# ...
object_refs = worker.core_worker.submit_task(
                self._language,
                self._function_descriptor,
                list_args,
                name,
                num_returns,
                resources,
                max_retries,
                placement_group.id,
                placement_group_bundle_index,
                placement_group_capture_child_tasks,
                worker.debugger_breakpoint,
                override_environment_variables=override_environment_variables
                or dict())

上面描述了 Ray 是怎么封装 Python 代码成一个 hash 和 pickle 化了的代码。下面再看看 worker 是怎么调用这个代码的,Ray 的 Worker 执行部分的代码如下:

Status CoreWorker::ExecuteTask(...) {
    // ...
    RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()};

  // ...

  status = options_.task_execution_callback(
      task_type, task_spec.GetName(), func,
      task_spec.GetRequiredResources().GetResourceMap(), args, arg_reference_ids,
      return_ids, task_spec.GetDebuggerBreakpoint(), return_objects);

提交了任务和 python 代码之后, Ray 怎么在后台处理和执行这些任务呢。这里面包含了 Ray 的任务机制,还有基于 boost asio 的异步执行规范。

Ray 1.0 架构解读进程提交了 task,进程就作为 task 返回的所有者。可以通过 raylet 获取资源来执行 task。在这里,driver 是结果 “A” 的所有者,“Worker 1” 是结果 “B” 的所有者。当一个 task 被提交后,task 的所有者等待任何依赖在集群里可用。例如 ObjectRef 是我们作为参数带入到 task里的依赖。当这些依赖准备好的时候,所有者从分布式调度里面获得资源来执行这个 task。一旦资源可用,调度就批准这个提交。task 所有者在所在的 worker 通过 gRPC 发送一个特定的 “task 描述”来调度 task 。当执行 task 的时候,worker 必须存储这个执行的返回值。如果返回值很小,worker 就直接在所有者 worker 的内部返回,把返回值拷贝到进程内的对象存储。如果返回值很大,worker 在它的本地共享内存对象存储这个结果,并且反馈给所有者这个对象在分布式内存。这就允许所有者可以在它所在的本地 node 引用这个对象和获取这个对象(在 raylet 之间有一层通讯机制,调度 raylet 所在的共享内存的数据。worker 本身也有个消息传递机制/grpc,小的数据可以直接传输)。

其它

ray-python-example automl_service

Ray on ACK 实践探索之旅 - RayCluster 篇