技术

数据湖 高性能计算与存储 Linux2.1.13网络源代码学习 《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go堆内存分配 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 容器日志采集 Kubernetes 控制器模型 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 jib源码分析之细节 线程排队 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 递归、回溯、动态规划 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型和jvm内存布局 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

架构

多集群及clusternet学习 AutoML和AutoDL 特征平台 实时训练 分布式链路追踪 helm tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 机器学习中的python调用c 机器学习训练框架概述 embedding的原理及实践 tensornet源码分析 大模型训练 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 在离线业务混部 RNN pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 tensorflow学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台梳理 tf-operator源码分析 k8s批处理调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Kubernetes webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 controller 组件介绍 openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 serverless 泛谈 概率论 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念入门 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 用户登陆 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 当我在说模板引擎的时候,我在说什么 用户认证问题 资源的分配与回收——池 消息/任务队列

标签


数据并行——ps

2022年03月25日

简介

第一手的材料 就是李沐大神的 论文Scaling Distributed Machine Learning with the Parameter Server 巨大的模型其实就是巨大的参数Parameter Server分布式训练概述(上篇)

基本过程

机器学习参数服务器ps-lite (1) —– PostOffice如果做一个类比,参数服务器是机器学习领域的分布式内存数据库,其作用是存储模型和更新模型。

我们来看看机器学习的几个步骤,这些步骤不断循环往复。

  1. 准备数据:训练进程拿到权重 weight 和数据(data + label);
  2. 前向计算:训练进程使用数据进行前向计算,得到 loss = f(weight, data & label);
  3. 反向求导:通过对 loss 反向求导,得到导数 grad = b(loss, weight, data & label);
  4. 更新权重:weight -= grad * lr;
  5. 来到1,进行下一次迭代; 如果使用参数服务器训练,我们可以把如上步骤对应如下:

  6. 参数下发:参数服务器服务端 将 weight 发给 每个worker(或者worker自行拉取),worker就是参数服务器Client端;
  7. 并行计算:每个worker 分别完成自己的计算(包括前向计算和反向求导);
  8. grad 收集:参数服务器服务端 从每个 Worker 处得到 grad,完成归并(或者worker自行推送);
  9. 更新权重:参数服务器服务端 自行将 grad 应用到 weight 上;
  10. 来到1,进行下一次迭代;

为什么弄一个ps server

  1. 分布式训练是一种特殊的分布式计算,反向传播时涉及到梯度的汇总与计算,相对于一般的spark/mapreduce 就复杂了不少
  2. 单节点的内存容量支持的模型大小有限。(embedding 的特点使得worker 每次只需要pull 部分weight, 一些optimizer 算法 可能要花费两三倍 weight的空间)。

以往的分布式计算,比如mapreduce的原理,是将任务和数据分配到多个节点中做并行计算,来缩短任务的执行时间。对于分布式训练来说,同样是希望将训练任务部署到多个节点上并行计算,但是因为反向传播涉及到梯度的汇总和权重的更新,相比之前的分布式计算更加复杂。也有采用hadoop或者spark的分布式训练,用单个driver或者master节点来汇总梯度及更新权重,但是受限于单节点的内存容量,支持的模型大小有限。另外,集群中运行最慢的那个worker,会拖累整个训练的迭代速度。为了解决这些问题,Parameter Server做了新的设计。

两点设计使ps能够克服Master/Slave架构应用于大规模分布式训练时的困难:

  1. 所有参数不再存储于单一的master节点,而是由一群ps server节点负责存储、读写;
  2. 得益于推荐系统的特征是超级特征的特点,一个batch的训练数据所包含的feature数目是有限的,因此,我们没必要训练每个batch时都将整个模型(i.e., 上亿的embedding)在server/worker之间传来传去,而只传递当前batch所涵盖的有限几个参数就可以了。

几点设计

优化点:每个kv都是都是很小的值,如果对每个key都发送一次请求,那么服务器会不堪重负。为了解决这个问题,可以考虑利用机器学习算法中参数的数学特点(即参数一般为矩阵或者向量),将很多参数打包到一起进行更新。

异步

每个worker 互相不干扰,各自 pull 参数,然后计算梯度后,再通过 push 将梯度值回传给 server。server 再汇总所有 worker 的梯度结果后一起更新最终参数。这里的异步有2个方面

  1. optimizer 更新参数
    1. 同步训练:每一轮迭代所有worker都要保持同步,每个worker只进行一次前向和后向计算,server收集所有worker的梯度求平均并更新参数,然后进行下一轮迭代。对于同步训练,每一次迭代的训练速度,取决于整个系统中最慢的worker和最慢的server。
    2. 异步训练,每个worker各自进行前后向计算,不需要等待其他worker,持续地进行迭代。而在server侧, 只要有worker push新的梯度过来,就会更新参数。
  2. 另一个问题是pull-计算-push 这个操作太频繁,通信有压力,拖慢计算。所以可以采取时效性不那么高的方法,就是不必每次都 pull 和 push,比如worker计算出第10次迭代的梯度之后,立即进行第11次迭代,而不需要pull新的权重过来,没有等待时间,效率较高;

重温经典之ps-lite源码解析(1):基础在纯异步的ASP模式中,每台worker在发送完自己的梯度后,不用等其他worker,就可以开始训练下一个batch的数据。由于无须同步,ASP的性能优势比较明显。但是,的确存在可能性,一个非常慢的worker基于老参数计算出过时梯度,传到server端会覆盖一些参数的最新进展。但是在实际场景下,由于推荐系统的特征空间是超级稀疏的,因此两个worker同时读写同一feature造成冲突的可能性还是较低的,因此纯异步ASP模式的应用还是比较普遍的。

浅谈工业界分布式训练(一)同步更新虽然可以保证Consistency,但由于各节点计算能力不均衡无法保证性能,而异步更新或者半同步更新并没有理论上的收敛性证明,Hogwild!算法证明了异步梯度下降在凸优化问题上按概率收敛,而深度学习问题一般面对的是非凸问题,所以无论异步和半同步算法都无收敛性的理论保障。所以只是根据经验,大部分算法在异步更新是可以收敛,求得最优或者次优解(其实现在无论是学术界和工业界,深度学习只要效果好就行)。当然目前比较好的方式针对针对SparseNet部分( 低IO pressure, 但高memory consumption),DenseNet部分 (高IO pressure,但低memory consumption)的特点,对sparsenet进行异步更新(因为Embedding Lookuptable的更新是稀疏的,每个worker更新不同的特征,冲突率较低),DenseNet采用同步更新/AllReduce的方式尽量逼近同步训练的效果。PS:为了防止模型不收敛,一般框架同步异步都会支持。hogwild 一般特指 parameter server 最常见的用法:完全无锁的异步训练。hogwild 这个术语本身在学术界用的更多,工程上用的比较少。

分布式

ps server 并不是只有一个master 来分发所有参数,而是存在一个 Server group,即多个 server 机器,每个机器只负责存一部分参数就行。这样就避免唯一 master 不停广播的通信代价问题。前面说了,server 存的是<key,value>,每个 server 上这个 key 的范围就是通多一致性哈希来分配的。这样设计有个好处,就是加入一个新节点,或者丢失删除一个节点后,参数可以通过环形只在相邻的节点上调整即可,避免节点频繁大规模的参数变动。

ElasticDL Parameter Server Design

  1. 可以存的更多。models could be large and overrun the memory space of a single PS instance. In such case, we need to partition the model and store different partitions in different PS instances.
  2. 分担通信负担。distributes the model parameter communication from workers among PS instances.

存储两类数据

广告推荐中大规模分布式模型 ps server 上存储的参数格式是<key, value>对,支持set/get/update 以及自定义函数。每个 worker 读入一个 batch 数据后,会向 server 执行 pull 操作,获取当前计算需要的参数的最新的值。比如稀疏参数的获取,发现样本中,location 这一维特征有北京,上海,南京3个sign,那么则将这3个当做 key 去请求 server 获得当前的最新 embedding 向量。计算前向和梯度,也是需要 dense 模型参数的,所以也会 pull DNN 网络的参数。

Each PS node has a dictionary-based data structure to store its partition of model parameters.We consider two kinds of model parameters:

  1. non-embedding parameters,一般不大,不分区,存在一个ps 上,tf.Variable name 作为key,tf.Variable 作为value。可以使用 hash(p_name) % N 选择存储的ps
  2. embedding tables,Each embedding layer has an embedding table which maps a discrete ID i to an embedding vector vᵢ. Embedding tables could be huge, especially in some recommending and ranking models. Thus, we partition each embedding table and store every partition in an unique PS pod. For an embedding vector vᵢ, we select the (i mod N)-th parameter server to store it. To store an embedding vector, We use its corresponding embedding layer name and discrete ID as the key, and a 1-D numpy.ndarry as the value. PS:例如一个形状为 [m, n, l, k] 的 tensor 可以按切片的数量保存为 m 个形状为 [n, l, k] 的 KV 数据,key 为 tensor_name 和 m 维度序号组合的唯一命名。 针对大规模 Embedding 参数的定制处理

初始化:We use lazy initialization for model parameters in PS. PS does not have the model definition. Thus workers are responsible for initializing parameters and push the initialized parameters to corresponding PS pods. Each PS pod has a parameter initialization status, which is False after the PS pod launch.

  1. When a worker tries to get non-embedding parameters from the PS pod through a RPC call pull_variable, the PS pod tells the worker that the parameter initialization status is False in response. If the worker has already initialized non-embedding parameters, it sends non-embedding parameter values to the PS pod by a gRPC call push_model. When the PS pod receives non-embedding parameters in its first RPC service for push_model, it initializes non-embedding parameters and sets the parameter initialization status as True.PS: 这也是为什么 ps 挂了没事,worker 会push
  2. For an embedding vector, the corresponding PS pod will initialize it in the first pull_embedding_vector service that contains this embedding vector. The PS pod needs the embedding vector size and the initialization method for the initialization. The embedding vector size and the initialization method are in the model definition and workers can send them in push_model to PS pods together with non-embedding parameter values.

参数更新:

  1. A worker computes gradients in each training iteration, which contain gradients for non-embedding parameters and some embedding vectors if applicable. The worker partitions these gradients using their corresponding parameter names or discrete IDs for embedding vectors. Then the worker sends gradient partitions to their corresponding PS pods by RPC calls push_gradient.When a PS pod receives gradients in push_gradient, it uses a TensorFlow optimizer to apply gradients to non-embedding parameters.
  2. We have already implemented an OptimizeWrapper to sparsely update embedding vectors. OptimizeWrapper uses corresponding embedding vectors to form a temporary variable, applies gradients to this temporary variable, and writes results back to these embedding vectors. The PS pod can use this OptimizeWrapper directly to update embedding vectors.

故障恢复:The model may contain one or more embedding layers with embedding tables as their parameters. If so, a minibatch of training data in a worker contains some embedding IDs, which correspond to a subset of embedding tables. The worker pulls all non-embedding parameters and only a subset of embedding tables from PS pods in the training. Thus, the PS pod can recover non-embedding parameters from workers but not embedding tables.

  1. For non-embedding parameters, the PS pod can recover them from workers in the same way as the parameter initialization by setting its parameter initialization status as False.
  2. For embedding tables, PS creates replicas to support fault-tolerance. For each PS pod PSᵢ, it can store M replicas of its embedding table partitions in M PS pods indexed from (i+1) mod N to (i+M) mod N. The relaunched PS pod can recover embedding tables from one of its replicas. PS: 一个ps 存了两份 replica,还周期性的同步呢。

Live replication of parameters between servers supports hot failover. Failover and selfrepair in turn support dynamic scaling by treating machine removal or addition as failure or repair respectively. PS:多副本 ==> 容错 ==> 弹性。每个参数会在PS集群中有三个副本,存储在不同的节点上来实现冗余。其中一个节点会被选为primary,来提供针对某个参数的服务。当这个节点失效时,另外两个副本中会被挑选出一个作为新的primary,来继续此参数的服务。因而,参数服务器也是需要调度的。

实现

一个大神的简单 c实现 Superjomn/SwiftSnails

ElasticDL——ps实现

ElasticDL Parameter Server Design

Message Definition

message Tensor {
    enum DataType {
        BOOL = 0;
        INT16 = 1;
        INT32 = 2;
        INT64 = 3;
        FP16 = 4;
        FP32 = 5;
        FP64 = 6;
    }
    string name = 1;
    DataType data_type = 2;
    repeated int64 dim = 3;
    bytes content = 4;
    repeated int64 indices = 5;
}
message EmbeddingTableInfo{
    string name = 1;
    repeated int64 dim = 2;
    string initializer = 3;
}
message Model {
    int64 version = 1;
    # repeated 则表示数组
    repeated Tensor variables = 2;
    repeated EmbeddingTableInfo embedding_table_info = 3;
}
message PullVariableRequest{
    int64 version = 1;
}
message PullVariableResponse{
    bool model_init_status = 1;
    Model model = 2;
}
message PushGradientRequest{
    int32 model_version = 1;
    repeated Tensor gradients = 2;
}
message PushGradientResponse{
    bool accepted = 1;
    int32 model_version = 2;
}
message PullEmbeddingVectorRequest{
    string name = 1;
    repeated int64 ids = 2;
}
message SynchronizeEmbeddingRequest {
    int32 replica_index = 1;
}
message SynchronizeEmbeddingResponse {
    repeated Tensor embedding_vectors = 1;
}

RPC Definition

service PServer{
    # pull trainable tensorflow variables created by Keras layers
    rpc pull_variable(PullVariableRequest) returns (PullVariableResponse);

    # pull embedding vectors in ElasticDL embedding layers
    # Do we need to create a new message `PullEmbeddingVectorRequest` rather than use `Tensor`?
    rpc pull_embedding_vector(PullEmbeddingVectorRequest) returns (Tensor);

    # push trainable tensorflow variables and meta info for ElasticDL embedding layers
    rpc push_model(Model) returns (google.protobuf.Empty);

    rpc push_gradient(PushGradientRequest) returns (PushGradientResponse);

    # PS to recover embedding vectors after relaunch
    rpc get_replica(SynchronizeEmbeddingRequest) returns (SynchronizeEmbeddingResponse);

    # PS replica synchronization
    rpc synchronize_embedding(SynchronizeEmbeddingRequest) returns (SynchronizeEmbeddingResponse);
}

Data Structure

class Tensor(object):
    def __init__(self, name=None, value=None, indices=None):
        self.name = name
        self.value = value
        self.indices = indices
    @classmethod
    def from_tensor_pb(cls, tensor_pb):
        """Create an ElasticDL Tensor object from tensor protocol buffer.
        Return the created tensor object.
        """
        pass
    def to_tensor_pb(self):
        pass
    def to_tf_tensor(self):
        pass
    def to_ndarray(self):
        pass

def serialize_to_pb(tensor, pb):
    """Serialize ElasticDL Tensor to tensor protocol buffer."""
    pass
def deserialize_from_pb(pb, tensor):
    """Deserialize tensor protocol buffer to ElasticDL Tensor."""
    pass
def tensor_pb_to_ndarray(tensor):
    """Deserialize tensor protocol buffer and return a numpy ndarray."""
    pass
def tensor_pb_to_tf_tensor(tensor):
    """Deserialize tensor protocol buffer and return a TensorFlow tensor."""
    pass
# In `Parameters`, interfaces `set_*_param` have two arguments, `value` and `name` (or `layer_name`).If `value` is a ElasticDL `Tensor` instance, `name` can be None.Otherwise `value` is a numpy ndarray, and `name` must be specified.
class Parameters(object):
    def __init__(self):
        # Parameter initialization status
        self.parameter_init_status = False
        # Non-embedding parameter dict, maps parameter name to tf.Variable instance
        self.non_embedding_params = {}
        # Embedding table dict, maps embedding layer name to `EmbeddingTable` instance
        self.embedding_params = {}

    @property
    def non_embedding_params(self):
        return self._non_embedding_params
    def set_embedding_param(self, value, layer_name=None):
        pass
    def get_embedding_param(self, layer_name, ids):
        return self._embedding_params.get(layer_name).get(ids)
    def set_non_embedding_param(self, value, name=None):
        pass
    def init_non_embedding_param(self, value, name=None):
        pass
    def set_meta_info(self, layer_name, dim, initializer):
        pass

class EmbeddingTable(object):
    def __init__(self, dim, initializer):
        # Embedding vector dict, maps ID to 1-D numpy.ndarray
        self._embedding_vectors = {}
        # the dimension of embedding vectors
        self._dim = dim
        # the initializer name for initializing embedding vectors
        self._initializer = initializer

    def get(self, ids):
        values = []
        for id in ids:
            if id not self._embedding_vectors:
                val = initialize_embedding_vector(self._dim, self._initializer)
            else:
                val = self._embedding_vectors.get(id)
            values.append(val)
        return np.concatenate(values).reshape(len(ids), -1)

    def set(self, ids, values):
        pass

Here is the pseudocode for a worker to pull variable from the PS. If the non-embedding variables are not initialized, the PS will tell the worker to initialize them and report to the PS.

class PServer(elasticdl_pb2_grpc.PServerServicer):
    ...
    def pull_variable(self, request):
        res = PullModelResponse()
        if self._need_initialize_model:
            res.model_init_status = True
            return res
        res.model_init_status = False
        res.model = self._get_model() # get model in this PS instance
        return res

    def push_model(self, request):
        model = request.model
        ... # initialize model in this PS instance

pull和push是ps中的重要操作。

class Worker(object):
    ...
    def pull_variable(self):
        # for-loop should be implemented in multithread
        for ps_index in range(self._ps_node_num):
            req = PullModelRequest() # create request code keeps the same with current code
            res = self._stub[ps_index].pull_variable() # pull variable from PS
            if res.model_init_status:
                # worker initializes its model here if needed
                model = serialize_model_to_pb()
                self._stub[ps_index].push_model(model) # get model in this worker
            req = PullModelRequest() # create request code keeps the same with current code
            res = self._stub[ps_index].pull_variable() # pull variable from PS
            if res.model_init_status:
                raise Error or try a pre-defined constant times

《用python实现深度学习框架》——ps实现

ps 架构的参数放在 ps 节点上,worker和ps直接通过 rpc 进行push/pull。《用python实现深度学习框架》给出的ps/worker 模型的grpc 通信api定义

service ParameterService{
  # ps 接收从各个worker 发送过来的请求,拿到各个节点的梯度,并根据节点名称累加到相应的缓存中
  # Push接口,各节点上传梯度
  rpc Push(ParameterPushReq) returns (ParameterPushResq){}
  # Pull接口,各节点下拉梯度
  rpc Pull(ParameterPullReq) returns (ParameterPullResq){}
  # 变量节点初始化接口
  rpc VariableWeightsInit(VariableWeightsReqResp) returns(VariableWeightsReqResp){}
}
# ps server 实现
class ParameterService(xxx):
  def Push(self,push_req...):
    node_with_gradients,acc_no = self._deserialize_push_req(push_req)
    if self.sync:
      self._push_sync(node_with_gradients,acc_no)
    else:
      self._push_async(node_with_gradients,acc_no)
  def _push_sync(self,node_with_gradients,acc_no):
    if self.cond.accquire():
      # 等待上一轮所有worker 等下拉完成
      while self.cur_pull_num != self.worker_num:
        self.cond.wait()
      # 记录上传次数
      self.cur_push_num +=1
      # 把梯度更新到缓存
      self._update_gradients_cache(node_with_gradients)
      # 累计梯度数量
      self.acc_no += acc_no
      if self.cur_push_num >= self.worker_num:
        self.cur_pull_num = 0
        self.cond.notify_all()
      self.cond.release()
    else
      self.cond.wait()
# main 实现
def train(worker_index):
  ...
if __name__ == '__main__':
  # 解析参数
  if role == 'ps':
    server = ps.ParameterServiceServer(...)
    server.serve()
  else:
    worker_index = args.worker_index
    train(worker_index)
class Trainer(object):
  def train_and_eval(self,train_x,train_y,test_x=None,test_y=None):
    # 初始化权重变量
    self._vaiable_weights_init()
    # 开始模型训练
    ##  第一层循环,迭代epoches轮
    for self.epoch in range(self.epoches):
      ## 模型训练
      self.train(train_x,train_y)
      ## 模型评估
      if self.eval_on_train:
        ...
  def train(self,train_x,train_y):
    # 遍历训练数据集
    for i in range(len(list(train_x.values())[0])):
      self.one_step(self._get_input_values(train_x,i),train_y[i])
      if (i+1) % self.batch_size == 0:
        self._optimizer_update()
  # 执行一次前向传播和一次反向传播
  def one_step(self, data_x,data_y,is_eval=False):
    ## 根据输入节点的名称,从输入数据dict中找到对应数据
    for i in range(len(self.inputs)):
      input_value = data_x.get(self.inputs[i].name)
      self.inputs[i].set_value(np.mat(input_value).T)
    ## 标签赋值给标签节点
    self.input_y.set_value(np.mat(data_y).T)
    ## 只有在训练阶段才执行优化器
    if not is_eval:
      self.optimizer.one_step()
    @abc.abstractmethod
    def _variable_weights_init(self):
      raise NotImplementedError()
    @abc.abstractmethod
    def _optimizer_update(self):
      raise NotImplementedError()

《用python实现深度学习框架》在分布式场景下,以ps 模式为例,只要定义一个 DistributedTrainer 实现_variable_weights_init 和 _optimizer_update 方法即可实现一个针对PS 架构的训练器。PS:allreduce 机制也大致类似,可以查看书中示例

class DistTrainerParameterServer(Trainer):
  def __init__(self,*args,**kargs):
    Trainer.__init__(self,*args,**kargs)
    cluster_conf = kargs['cluster_conf']
    ps_host = cluster_conf['ps'][0]
    self.ps_client = ps.ParameterServiceClient(ps_host)
  def _variable_weights_init(self):
    var_weight_dict = dict()
    for node in default_graph.nodes:
      if isinstance(node,Variable) and node.trainable:
        var_weight_dict[node.name] = node.value
    # 把自己的weight 初始值发给ps
    duplicated_var_weight_dict = self.ps_client.variable_weights_init(var_weights_dict)
    # 使用ps返回的初始值,重新初始化本地参数
    for var_name,weights in duplicated_var_weight_dict.items():
      update_node_value_in_graph(var_name,weights)
    print('[INIT] worker variable weights initialized')
  def _optimizer_update(self):
    # 把当前梯度上传到ps 上,此操作可能会block,直到所有节点都上传完成
    acc_gradient = self.optimizer.acc_gradient
    self.ps_client.push_gradients(acc_gradient,self.optimizer.acc_no)
    # 从ps那里把所有节点的平均梯度都拉回来,此操作可能会block,直到所有节点都下拉完成
    node_gradient_dict = self.ps_client.pull_gradients()
    # 使用得到的平均梯度,利用优化器的优化算法,更新本地变量
    self.optimizer.update(node_gradient_dict)

ps tensorflow实现

在TensorFlow 的实现中,Tensor对象及内部 TensorBuffer 对象本身始终保存在 CPU 内存上,TensorBuffer 内部指针指向的缓冲区有可能位于CPU 或GPU 内存,Tensor 的跨设备复制主要指 内部缓冲区的复制过程

  1. CPU 内存间的复制无需使用特殊函数,对于TensorBuffer 指向的缓冲区不做完整复制,只需复制指针并增加引用计数。
  2. CPU 与GPU 内存之间的数据复制 函数 需要不同GPU 设备的DeviceContext 子类重写。

Tensorflow使用gRPC 实现进程间通信,分布式模式中的每个进程具有一个 GrpcServer 实例(也包含客户端对象),包含GrpcMasterService 和 GrpcWorkerService 两个服务

  GrpcMasterService主要用于会话相关操作 GrpcWorkerService 主要用于数据流图相关操作
控制接口 CreateSession,ExtendSession,… GetStatus,RunGraph
数据通信接口   RecvTensor
  1. 服务端均以异步模式实现,master 服务的客户端调用多为同步模式,worker 服务的客户端调用多为异步模式
  2. 对于同一进程内的 grpc 调用,均有可能直接访问本地对象
  3. RecvTensor 有一定特殊性,Tensor 的大小可能很大,自定义了消息传输与解析格式(没有用grpc 自带的),客户端调用的生命周期管理也有所不同
  4. 为进一步提高进程间数据通信性能,在保留gRPC 通信的同时,利用TensorFlow 的扩展机制,支持RDMA 特性的InfiniBand 协议栈,上层使用与grpc 接口一致,只需开启相关配置即可。

Paddle——ps实现

Paddle计算图拆分与优化 深度学习网络中有两个基本元素:Operator(比如加减/FC/Embedding查表) 和 Variable(分为稠密参数和稀疏参数) 单机的计算图执行流程:

  1. 计算图拿到参数的值(Var)之后,会首先执行前向OP(FF OP),OP可能会执行在不同的设备上,如CPU/GPU/Kunlun 或其他AI芯片,我们用XPU代指。
  2. 前向OP计算完成后,得到loss,会继续计算反向OP(BP OP),得到各个参数的梯度(Var_Grad)
  3. 指定SGD/Adam等优化器,利用参数的梯度(Var_Grad)更新原始参数(Var)
  4. 重复以上流程,迭代参数的更新,实现深度学习网络的训练

  1. Worker(Trainer)在计算得到参数的梯度(Var_Grad)后,会通过RPC发送给PServer
  2. PServer监听通信端口,将收到的不同参数分别通过不同的Oprimizer OP完成更新。 PS: 参数更新放在ps的问题是 添加新的优化算法必须修改 PS 的实现
  3. Worker在下一个迭代时,请求PServer上最新的参数
  4. 重复以上流程,迭代参数的更新,实现分布式参数服务器的训练

Paddle计算图拆分与优化参数服务器模式下,所有参数的全局真值都被分片保存在各个Pserver上,PServer执行Optimizer OP(另一种选择是 ps 只是汇总梯度,weight还是worker 更新的),执行参数的更新。以PS-CPU异步模式的计算图介绍PServer的计算图

进程内通信主要实现内存数据的跨设备复制(本地CPU和GPU内存之间、多个GPU内存之间), 进程间通信主要实现数据流和控制流基于网络协议的传输。Tensorflow 的运行时框架会根据应用代码中的集群和设备配置,在数据流图构建时自动插入通信操作,并在数据流图执行过程中动态调度这些操作。

  1. 对于逻辑上具有数据传输语义,但物理上通信的源和目标位于同一设备、同一地址空间的进程内通信,只传输数据指针,不传输数据本身。
  2. 对于cpu 和gpu 内存间的设备通信,本质是跨设备内存的数据复制,尽量使用底层设备接口实现(比如DMA机制),而不是用面向应用层的API
  3. 涉及GPU的内存传输一般需要经过CPU 内存中转,除非使用支持RDMA 协议的高性能网络。
  4. 尽量做到 计算与通信过程重叠,很多通信函数采用异步模式及多线程并发执行

与allreduce 对比

机器学习参数服务器ps-lite (1) —– 历史溯源在参数服务器之前,大部分分布式机器学习算法是通过定期同步来实现的,比如集合通信的all-reduce,或者 map-reduce类系统的reduce步骤。当async sgd出现之后,就有人提出了参数服务器。

分布式常用的2种模式有ParameterServer 和 AllReduce/RingAllReduce。随着开源框架的火热迭代,再者 GPU 显存也越来越大,AllReduce 模式研究的更多一点。毕竟大多数研究的还是 dense 模型,就算上百层的网络,参数也大不到哪去。所以很多训练都是数据并行,每个节点可以存储完整的模型参数。但是像 CTR 这类用到大规模离散特征的,本质也是一个大规模离散模型,一般称为 sparse 模型。几百 G 的模型很常见,一个节点也不太可能放的下。这时候 parameter Server 模型更加适用一些。

embedding/稀疏场景优化

一文梳理推荐系统中Embedding应用实践在自然语言中,非端到端很常见,因为学到一个好的的词向量表示,就能很好地挖掘出词之间的潜在关系,那么在其他语料训练集和自然语言任务中,也能很好地表征这些词的内在联系,预训练的方式得到的Embedding并不会对最终的任务和模型造成太大影响,但却能够「提高效率节省时间」,这也是预训练的一大好处。但是在推荐场景下,根据不同目标构造出的序列不同,那么训练得到的Embedding挖掘出的关联信息也不同。所以,「在推荐中要想用预训练的方式,必须保证Embedding的预训练和最终任务目标处于同一意义空间」,否则就会造成预训练得到Embedding的意义和最终目标完全不一致。比如做召回阶段的深度模型的目标是衡量两个商品之间的相似性,但是CTR做的是预测用户点击商品的概率,初始化一个不相关的 Embedding 会给模型带来更大的负担,更慢地收敛。

阿里开源自研工业级稀疏模型高性能训练框架 PAI-HybridBackend在PS架构下使用GPU设备训练的两个问题。

  1. 变化的硬件资源瓶颈。疏训练主要由Embedding阶段、特征交叉(feature interation)阶段和多层感知器(MLP)阶段组成,Embedding阶段在PS范式的训练下占据了至少50%以上的训练时间。经过分析发现,Embedding阶段的算子主要以访存密集型(memory access intensive)和通信密集型的算子(communication intensive)为主,主要需要的硬件资源是内存和网络的带宽,而后两个阶段的算子则是计算密集型的算子占主导,需要的资源是算力。这意味着在PS的范式训练下,任何一个阶段都有可能存在某一种硬件资源成为瓶颈而其他硬件资源被浪费的现象。GPU使用率(SM Util)在不同的训练阶段之间呈现脉冲式变化(pulse)
  2. 算子细碎化(fragmentation),生产实际中的模型往往拥有上百路的Embedding特征查询,每一路的特征查询在TensorFlow内都会调用数十个算子操作(operations)。TensorFlow的引擎在调度上千级别的大量的算子操作需要额外的CPU线程开销;对于GPU设备来说,过多的 CUDA kernel 提交到流处理器上(TensorFlow下每个GPU设备只有一个stream抽象)带来了GPU Stream Multiprocessor (SM)的调度开销,同时每个算子处理数据的并发度又不高,从而很难打满GPU的计算单元。

HybridBackend架构中参数放在 worker 上,不同的稀疏特征,比如说ID类型的特征,相对来讲会非常大,对于有些特征,它的特征的量非常少,比如说省份、性别这种类型的特征。不同的特征,它的同步方式是有区别的

  1. 稠密参数 replication 存放,每个 worker 都有副本,梯度更新时进行 allreduce;
  2. 稀疏参数 partition 存放,每个 worker 只有部分分片,梯度更新时进行 alltoall。

allreduce 和 alltoall 都会使用 nccl 进行同步通信,效率较高。hb 进行 alltoall 时,通信的是稀疏梯度,而不是完整的参数,通信量上和 ps 是一致的,但是通信效率会更高。

TensorFlow在推荐系统中的分布式训练优化实践在原生的TensorFlow中构建Embedding模块,用户需要首先创建一个足够装得下所有稀疏参数的Variable,然后在这个Variable上进行Embedding的学习。

user_col = tf.feature_column.categorical_column_with_identity(key='userId', num_buckets=30001)
user_emb_col = tf.feature_column.embedding_column(user_col, 10)

然而,使用Variable来进行Embedding训练存在很多弊端:

  1. Variable的大小必须提前设定好,对于百亿千亿的场景,该设定会带来巨大的空间浪费;
  2. 训练速度慢,无法针对稀疏模型进行定制优化。 我们首先解决了有无的问题,使用HashTable来替代Variable,将稀疏特征ID作为Key,Embedding向量作为Value。相比原生使用Variable进行Embedding的方式,具备以下的优势:
  3. HashTable的大小可以在训练过程中自动伸缩,避免了开辟冗余的存储空间,同时用户无需关注申请大小,从而降低了使用成本。
  4. 针对HashTable方案实施了一系列定制优化,训练速度相比Variable有了很大的提高,可以进行千亿规模模型的训练,扩展性较好。

核心流程大致可以分为以下几步:

  1. 稀疏特征ID(通常我们会提前完成统一编码的工作)进入Embedding模块,借助TensorFlow搭建的Send-Recv机制,这些稀疏特征ID被拉取到PS端,PS端上的Lookup等算子会实际从底层HashTable中查询并组装Embedding向量。
  2. 上述Embedding向量被Worker拉回进行后续训练,并通过反向传播计算出这部分参数的梯度,这些梯度进一步被位于PS端的优化器拉回。
  3. PS端的优化器首先调用Find算子,从HashTable获取到梯度对应的原始稀疏参数向量和相应的优化器参数,最终通过优化算法,完成对Embedding向量和优化器参数的更新计算,再通过Insert算子插入HashTable中。

系统架构

Parameter Server分布式训练概述(下篇)

XDL

XDL 包含 server和worker两类节点,既可以是物理节点,即server和worker分别部署在不同的物理机上,也可以是逻辑节点,即server和worker以进程的方式部署在同一台物理机上。后者是目前ps的主流做法,好处是减少了跨物理机之间的通信。

训练过程包括前向和后向计算。前向计算中,worker获取训练数据,给server发送特征ID请求,server查找对应的embedding向量并返回给worker,同时worker还要从server拉取最新的dense net,worker对embedding向量做pooling,例如求和或者平均,然后送入dense net进行计算。这部分还是worker的pull操作。后向计算,worker计算出sparse net和dense net的梯度之后发送给server,server收集到所有梯度之后,采用sgd/adagrad等各类优化算法对权重或参数做更新。这部分是worker的push操作。对于一些比较复杂的优化算法,例如,adagrad,adam等,训练过程需要累积梯度或者梯度平方项,这些累积项都存储在server中,server侧基于这些累积项和收集到的梯度,对模型权重做更新。PS: 对应的代码是哪块? 有助于分析一些框架与tf的关系

对于sparse net来说,因为特征规模庞大,所以需要较大的存储空间,又由于特征稀疏的特点,所以每一次只需要查找少量的embedding向量。所以sparse net具有存储大,但是I/O小的特点。dense net占用的存储空间较小,但是每一次训练需要拉取全部的参数,所以dense net对内存要求不大,但是对I/O要求很高。XDL采用不同的方式对两种网络进行管理。对于sparse net,使用hashmap做key-value存储,并且将所有的embedding向量均匀分布在各个server节点中,这对于缓解单个server节点的存储压力和通信都有帮助。dense net同样也均匀分割到所有server节点中。

在生产环境中容错是必不可少的,否则如果机器出现故障导致从头开始训练,会带来时间和资源的浪费。在训练过程中,server会保存模型的快照,可以理解为checkpoint。调度器会根据心跳来监控server的状态,如果检查到server出现异常,任务就会中断,等到server恢复正常后,调度器会通知server加载最近一次的快照继续训练。对于worker来说,训练过程中所有worker会把数据的读取状态上传到server中,如果worker失败之后恢复正常,worker会从server请求读取状态,在数据中断的地方继续训练,这样可以避免样本丢失,或者重复训练等问题。

tensornet

参见 tensornet源码分析

扩展tf

tf的分层结构(图-> 算子) 和 系统结构(master/worker 分别负责拆分和执行)都是分布式计算的常见设计(spark/flink也是类似),没啥问题,优化点一般在通信(同步异步、协议)、存储((sparse,存在独立ps 还是分布式)上。有两类

  1. 仅暴露给算子/在算子层以下优化。比如ps 、存储、协议等都在算子层以下
  2. 自定义算子。自定义算子了,还得确保能用上,一般会涉及到 自定义Model(包括Layer)、Optimizer 等,在自定义的 高层python api 上用到这些算子。除此之外,一些 c的一些逻辑 可能会直接暴露给python 层被使用,比如初始化底层的一些组件做一些准备工作(此时就是单纯的python 调用c,就不是自定义算子了)。
  3. python api 层扩展支持 EmbeddingVariable,主需要配套的c 支持

扩展的目的

  1. 支持大模型,主要是embedding 参数,基于tensorflow做扩展支持大模型的做法
  2. 训练加速,比如提高通信效率等

第一视角:深度学习框架这几年

  1. 推荐场景在电商,视频,资讯等众多头部互联网公司的火爆导致推荐系统对AI硬件的消耗在互联网公司超过了传统NLP,CV,语音等应用的总和。
  2. 无量一开始采用的是基于参数服务器的架构。对tf 的改造有两个方向
    1. 把TensorFlow作为一个本地执行的lib,在外围开发,TensorFlow被复用来提供Python API以及完成基础算子的执行。而参数服务器,分布式通信等方面则是自己开发,没有复用TensorFlow。
    2. 基于TensorFlow底层进行改造,研发难度会比较大,而且很可能与社区版TensorFlow走向不同的方向,进而导致TensorFlow版本难以升级。

阿里巴巴开源大规模稀疏模型训练/预测引擎DeepRec在TensorFlow引擎上支持大规模稀疏特征,业界有多种实现方式

  1. 最常见的方式是借鉴了ParameterServer的架构实现,在TensorFlow之外独立实现了一套ParameterServer和相关的优化器,同时在TensorFlow内部通过bridge的方式桥接了两个模块。
  2. TensorFlow是一个基于Graph的静态图训练引擎,在其架构上有相应的分层,比如最上层的API层、中间的图优化层和最下层的算子层。
    1. 基于存储/计算解耦的设计原则在Graph层面引入EmbeddingVariable功能;
    2. 基于Graph的特点实现了通信的算子融合。
  3. TensorFlow 模型准实时更新上线的设计与实现 针对已有算子 重新实现(比如tensor保存在第三方kv),然后将graphdef 中已有算子替换为自己实现的对应算子。

有的时候,一些框架也不是在扩展tf,而是把 tf 当时一个库在用(用它的前向和反向)。扩展方式五花八门的,不像 k8s 有明确的csi/cri/cni 接口要遵守