简介
- 简介
- 弹性背景
- DLRover
- train_script 的守护者elastic agent
- rendezvous/集会机制/实现服务发现
- elastic agent 源码
- 容错
- 与horovod 对比
- 其它
弹性背景
实现弹性训练需要面对哪些挑战和难点
- 需要一个节点/进程之间彼此发现的机制。
- 如何处理成员变更
- 如何捕获单个进程训练失败,如何在单个节点上管理所有训练进程。
- 如何与现有训练代码集成。
DLRover
阿里云 ACK 云原生 AI 套件中的分布式弹性训练实践什么是弹性训练?具体可以总结为三大块的能力:
- 训练规模弹性改变:这里主要指的是弹性改变训练的 Worker 数目,扩容增加 Worker 数量以提升训练速度,缩容减少 Worker 数量以腾出部分集群资源;
- 训练过程弹性容错:由于部分因素导致任务异常或可预见问题如 Spot 回收事件预示下的整体任务的容错,避免因为少部分 Worker 失败而直接导致的整个任务的失败;
- 训练资源弹性伸缩:可以根据任务需要或者问题处理的判断来动态变更任务训练 Worker 的资源配置以达到一个更合理的任务 Worker 资源配比。
而弹性训练的能力带来的意义,大概也可以总结为三点:
- 大规模分布式训练容错,有效提升训练任务运行的成功率;
- 提升集群算力利用率,充分利用弹性来协调在离线任务的资源分配;
- 降低任务训练成本,使用可被抢占或稳定性稍差但成本更低的实例来进行训练从而整体层面降低成本。
分布式训练一般分为两种类型,数据并行和模型并行。基于数据并行的分布式训练又分为两种不同的架构。
-
Parameter Server 架构。在 PS 模式下进行的弹性训练,由于其为异步模式,弹性的关键在于训练数据的划分。当其中一部分 Worker 失败之后,未被训练的数据或者失败 Worker 中的数据可以被剩下的 Worker 继续训练,当新的 Worker 加入之后,可以与现有的 Worker 一起参与进行训练。
在蚂蚁 AI Infra 团队开源的项目 DLRover 中,其实现了 Training Master 来参与弹性训练。由 Training Master 来负责对任务的监听、数据集的划分、各角色资源的弹性。其中数据集的划分是弹性训练的关键,在 Master 中有一个 Dataset Shard Service 的角色来负责具体数据集的划分。其将整个的数据集按照 Batch Size 进行切分,分为各个 Task Data,然后将 Task Data 放进数据集队列中供各个训练 Worker 进行消费。在 Dataset Shard Service 中存在着两个队列,所有未被训练消费的 Task Data 在 TODO 队列中,而正在被训练的 Task Data 则是在 DOING 队列,直到该 Data 训练结束 Worker 给出信号后,该 Task Data 才会完全出队。如果有训练 Worker 中途异常退出,检测超时的 Task Data 会重新进入 TODO 队列以供其他正常 Worker 进行训练。
-
AllReduce 架构,在 AllReduce 模式下进行的弹性训练,由于其为同步模式,弹性的关键在于如何保证训练的同步,同时还有为了同步梯度而建立起来的通信环的保持。当其中一部分 Worker 失败之后,剩下的 Worker 可以重建通信环继续训练,当新的 Worker 加入之后,可以与现有的 Worker 重建通信环进行训练。
DLRover 在 Kubernetes 上设计了一个 CRD ElasticJob,由 ElasticJob Controller 监听并创建一个 DLRover Master,再由该 Master 来创建 PS 和 Worker 的 Pod,并控制 PS 和 Worker 的弹性。
不同场景对应不同的crd: ElasticJob ==> ps/worker; PytorchJob ==> Pytorch allreduce; TrainingJob+ScaleIn+ScaleOut ==> Elastic Horovod。
DLRover 下的角色节点主要分为两种,Master 和 Worker。Master主要承担框架控制层面的工作,例如创建 Worker,运行时的容错实施等。Worker 主要承担实际训练的工作,会由 Worker 的主进程初始化训练 Agent,再由 Agent 拉起分布式训练进程进行训练。
DLRover 以 ElasticJob CRD 的形式将作业提交到集群。收到 CRD 后,ElasticJob Operator 会拉起一个 Master Pod 作为 Elastic Trainer。其从 Brain 服务中获取初始资源计划。Elastic Trainer 用它来创建 Scale CRD,并应用 Scale CRD 通知 ElasticJob Controller 启动所需的 Pod,每个 Pod 将在其上启动一个 Elastic Agent。在训练过程中,Elastic Trainer 的 Training Master 将数据分片分发给 Worker。同时,Cluster Monitor 监控每个作业的运行状态(i.e.每个节点的 Workload)和集群状态(i.e. 资源水位)。这些数据将定期报告给 Brain,Brain 将数据持久化到数据库中。然后 DLRover Brain 根据作业的运行状态,选择合适的算法生成新的资源计划,并通知 Elastic Trainer 开始资源调整。
- 自动资源推导:用户提交分布式作业时无需提供任何资源信息(不用配cpu/mem),帮助用户自动初始化训练资源,提升资源利用率与作业稳定性。
- 动态训练数据分片:针对不同 Worker 性能不通造成的木桶效应,根据实际消费速度分配训练数据,可配合 Failover 记录消费位点,数据不丢失。混部集群存在资源超卖和抢占的情况,部分节点消费数据慢,快节点需要等待慢节点,降低训练速度。DLRover 可以通过数据动态分发给慢节点少分发一些数据,减少等待。当扩容或者缩容时,需要有个全局协调者知道记录节点当前消费数据详情。当节点失败重启后,全局协调者需要知道节点已经消费和尚未消费的数据。如果这些逻辑让训练节点来做,训练节点和训练节点之间需要交互,增加训练节点逻辑的复杂性。DLRover Master 充当了这个全局协调者的角色。训练节点只管从 DLRover Master 获取 Shard,然后读取数据,不需要处理其他的逻辑。
- 单点容错:
- 自动节点检测是指在训练中断时,系统会自动进行硬件和通信检测,识别并隔离故障节点,随后启动新的节点替代。例如,当训练进程报错退出,而 worker 本身还在正常运行时,通常的做法是在原 worker 上重新拉起训练进程。绝大多数的问题,都可以通过重新拉起训练进程解决。但某些情况下,多次重试(例如,3 次)都失败了,会判定这个机器是故障机,排除掉这个机器并在新机器上重启训练。在大模型训练中,如果有一个故障机器,重启3次训练进程,至少需要 30 分钟。如果一开始就能判断出当前错误无法通过重启训练进程解决(例如当前机器出现掉卡错误),而需要新机器,就可以节约大量的时间。DLRover 的容错系统不仅依赖训练进程和 worker 的状态来判断训练状态,也通过分析训练日志,训练栈,芯片状态等来进行精确分析。
- 提供单点容错的能力,不需要完整重启作业。例如集群中,很常见的一类错误是由于用户配置了不足的内存,导致训练 OOM。在 DLRover 的帮助下,我们可以自动拉起一个优化配置的节点来恢复失败的 Node。
- 资源弹性:支持运行时 Pod 级和 CPU/Memory 级的资源弹性扩缩容,动态全局优化决策。通过监控作业节点的 Workload,DLRover 可以分析资源配置的瓶颈。常见的资源瓶颈有:节点抢占、Workload 不平衡、CPU 不足导致算力低下、节点数目不足。DLRover 可以通过动态的资源热更新来持续优化训练性能。通常不同的模型训练作业,需要不同的资源配置。然而用户倾向于超额配置作业的资源以保障作业的成功率。这通常会导致大量的资源浪费。DLRover 的自动扩缩容能力,可以自动根据作业的真实需求配置资源,以最少的资源达到最优的训练性能,从而减少资源浪费。
- DLRover 支持用户使用任何自己的训练框架,底层训练代码通过提供约定的 API 接口以实现自动弹性扩缩等需要同底层分布式代码深度交互。集群中部署完成后,终端算法同学基本可以无感接入。
train_script 的守护者elastic agent
python -m torch.distributed.run train_script.py
对于每一个node 有两个角色
- run.py 负责启动 elastic agent
- elastic agent 负责启动 train_script.py, 并给train_script.py 传递必要的参数(环境变量或参数形式,由脚本的获取方式决定)。
每个Node 上运行一个 Agent,Agent是一个worker manager,包含一个 rendezous,负责分布式协商,Agent 同时负责启动workers,监控 workers,捕获失效 workers,如果有故障/新加入worker,则重启 worker。Agent负责维护 WORLD_SIZE 以及 RANK 信息。用户不需要再手动提供,Agent会自动处理这些。
PyTorch 分布式之弹性训练(1) — 总体思路PyTorch Elastic Trainer (PET) 提供了一个可以用容错和弹性方式跨集群来训练模型的框架。PS: 还与Horovod 进行了对比。
- PET 使用了一个名为elastic-agent的新进程,每个节点有一个独立的elastic-agent。每个代理进程只负责管理该节点的一组本地工作进程,并与本作业其他节点上的弹性代理一起协调来确定进程组成员身份的变化。
- 成员变更的处理方式如下:
- 当一个工作进程失败时,管理它的弹性代理会杀死该节点上的所有worker,然后与其他代理建立一个集合操作(rendezvous),并使用新的集合信息来重启worker。
- 当代理以非零错误代码退出时,应该由上层调度模块(例如 Kubernetes)来重新启动代理(同理,此代理将重新启动它负责的所有worker)。
- 相同的恢复机制也适用于节点级故障。编排工具(诸如 Kubernetes )会调度作业以便job可以使用最小数目的代理副本运行,然后每个代理将依次编排用户的训练脚本。
- PET 尝试维护工作进程的数量,使它们保持在作业所需的
[min,max]
范围内。一旦发生故障或成员变更,所有幸存的worker将立即被杀掉。所以用户需要手动地处理 checkpoint,定期保存你的工作进度,来保证重启后训练能够继续下去。PET不强制指定如何管理checkpoints。应用编写者可以任意使用torch.save 和 torch.load 或更高层次的框架如PyTorch Lightening 进行处理,checkpoint的频率应取决于用户job对于失败的容忍度。
rendezvous/集会机制/实现服务发现
Agent 是具体节点上的后台进程,是独立个体。需要一个机制来完成 worker 之间的相互发现,变更同步等等(WORLD_SIZE 和 RANK 这些信息其实也需要多个节点同步才能确定),这就是下面的 Rendezvous 概念。
深度学习分布式训练框架 horovod (3) — Horovodrun背后做了什么Gloo 机制工作时,需要从env 中获取到 RendezvousServer 信息以便进行 Collective communication
设计
Rendezvous 负责集群逻辑,保证节点之间对于”“有哪些节点参与训练”达成强一致共识。
- 每一个 Agent 内部包括一个 Rendezvous handler,这些 handler 总体上构成了一个 Rendezvous 集群,从而构成了一个 Agent 集群。
- Rendezvous 完成之后,会创建一个共享键值存储(shared key-value store),这个store实现了一个torch.distributed.Store API。此存储仅由已完成Rendezvous的成员共享,它旨在让Torch Distributed Elastic在初始化作业过程之中交换控制和数据信息。
- Rendezvous 负责在每个agent之上维护当前 group 所有相关信息。每个 agent 之上有一个 rendezvous,它们会互相通信,总体维护一套信息,这些信息存储在上面提到的Store 之中。
- Rendezvous 负责集群逻辑相关,比如新加入节点,移除节点,分配rank等等。
PyTorch 分布式之弹性训练(4)—Rendezvous 架构和逻辑Rendezvous会提供以下细分功能。
- Barrier, 执行会合的节点将全部阻塞到 rendezvous 完成,即至少有min个节点(针对同一作业)已加入到Barrier,这也意味着对于固定大小的节点数目,barrier是不必要的。在达到”min”数量后,rendezvous 不会立刻宣布完成,而是会等待额外的一小段时间,这用来保证rendezvous不会”过快”完成,因为如果立刻完成,就会错过那些加入时只慢了一点点的节点。当然如果在Barrier处聚集了max个节点,则rendezvous立即完成。另外,还有一个总超时时间配置 :如果在超时时间之内 min个节点一直没有达到,则会导致 rendezvous 失败,这是一个简单的故障安全(fail-safe)解决方案,用来帮助释放部分分配的作业资源,防止资源浪费。
- 排他性(Exclusivity),如果一组节点已经完成rendezvous(可能已经在训练),那么其他试图加入的”迟到”节点只会被认为是等待状态,且必须等到现有rendezvous被结束。
- 一致性(Consistency),rendezvous完成后,其所有成员将对工作成员资格以及每个人在其中的角色(role)达成共识。此角色(role)使用一个介于 0 ~ world size 之间的整型来表示,被称之为rank。请注意,rank是不稳定的,比如,同一个的节点在下一次(重新)rendezvous中可能被分配了不同的rank。
- 在 rendezvous 过程中有容错机制:
- 在开始join rendezvous 和 rendezvous 完成之间,如果有进程崩溃(或网络故障等),就会自动引发一个re-rendezvous,剩余健康节点会自动重组。
- 节点也可能在rendezvous 完成后失败(或被其他节点观察到失败),这个场景由Torch Distributed Elastic train_loop 负责,也会触发一个re-rendezvous,训练过程不会中断。
RendezvousHandler
RendezvousHandler = 类似rpc 框架中的注册中心 + 协商逻辑,必要时还要自己 启动注册中心(后面的TCPStore),层层抽象将注册中心的kv 操作 ==> state 操作(Backend) ==> 注册、发现、rank/world_size 协商。PyTorch 分布式之弹性训练(5)—Rendezvous 引擎
Rendezvous 的支撑系统
- RendezvousParameters ,构建RendezvousHandler所需参数。
- RendezvousSettings ,用来存储rendezvous的配置,可以理解为静态元信息。
- _RendezvousState 是rendezvous的状态,是动态信息,每一个 node 都会维护一个本地 state。
- _NodeDesc 是rendezvous的一个节点。
- backend, RendezvousBackend
# /pytorch/torch/distributed/elastic/rendezvous/api.py
# rdzv backend: etcd/etcd-v2/c10d/static
class RendezvousHandler(ABC):
def next_rendezvous(self,) -> Tuple[Store, rank, world_size] # 注册、发现、协商都得用它
def get_backend(self) -> str
def is_closed(self) -> bool
def set_closed(self)
def num_nodes_waiting(self) -> int
def get_run_id(self) -> str
def shutdown(self) -> bool # 监听worker 失效后关闭 本轮rendezvous
pytoch 针对 RendezvousHandler(有多种实现 DynamicRendezvousHandler/StaticTCPRendezvous等) 维护了一个 RendezvousHandlerRegistry,launch_agent ==> rdzv_handler = rdzv_registry.get_rendezvous_handler(rdzv_parameters)
首先 根据backend 确定类型,再根据 rdzv_parameters 对 rendezvous_handler 初始化。
# /pytorch/torch/distributed/elastic/rendezvous/api.py
class RendezvousHandlerRegistry:
_registry: Dict[str, RendezvousHandlerCreator]
def register(self, backend: str, creator: RendezvousHandlerCreator) -> None:
def create_handler(self, params: RendezvousParameters) -> RendezvousHandler:
rendezvous_handler_registry = RendezvousHandlerRegistry()
实际运行发现 rdzv_endpoint 中指定的port 由 python -m torch.distributed.run train_script.py
进程监听,也就是 c10dStore 运行在 elastic agent 上。PS: 代码上看 etcd 系列的会清晰一下。
这里要注意的是,elastic 内部有一套 Rendezvous,和 distributed 原有的 Rendezvous 那套不一样,别搞混了。distributed 原有的 Rendezvous 就是一套简单的 KV 存储。elastic Rendezvous 则要复杂得多。
RendezvousBackend
在 PyTorch 之中,backend 概念指的是当前进程要使用的通信后端,一般来说,支持的通信后端有 gloo,mpi,nccl 。建议用 nccl。在弹性训练这里(Backend概念不一样),Backend 其核心就是一个 Store,用来存储相关信息,通过 set_state 和 get_state 来对 store 进行读写。Rendezvous 的各种同步操作,都是由各个代理连接到这个中心化的 Store,在其上完成。
class RendezvousBackend(ABC):
@abstractmethod
def get_state(self) -> Optional[Tuple[bytes, Token]]
@abstractmethod
def set_state(self, state: bytes, token: Optional[Token] = None) -> Optional[Tuple[bytes, Token, bool]]
pytorch 默认实现了 C10dRendezvousBackend 和 EtcdRendezvousBackend。C10d 后端主要基于一个 TCPStore,通过 TCP 进行同步。TCPStore 是基于 TCP 的分布式键值存储实现(类似于 Redis)。是一个典型的 client-server 架构,服务器存储/保存数据,而存储客户端可以通过 TCP 连接到服务器存储并执行诸如set()插入键值对、get()检索键值对等操作。 所以,对于 c10d 后端来说,在其中一个代理之上会运行 TCPStore Master,其负责监听端口,提供API,Rendezvous 的各种同步操作(barrier/rank rendezvous等),都是由各个代理连接到这个中心化的 TCPStore Master,在其上完成。
我们在构建DynamicRendezvousHandler时候要指定后端(RendezvousBackend)。
store = TCPStore("localhost")
backend = C10dRendezvousBackend(store, "my_run_id") # 配置了后端
rdzv_handler = DynamicRendezvousHandler.from_backend(
run_id="my_run_id",
store=store,
backend=backend,
min_nodes=2,
max_nodes=4
)
Store
Elastic Introduction Elastic Agent 的设计:如何管理多个 worker 进程
Rendezvous 完成后,将创建一个共享键值存储(key-value store)并返回给node。此存储实现了一个torch.distributed.store API,此存储仅由已完成rendezvous的成员共享,被Torch Distributed Elastic用作交换初始化作业控制和数据平面所必需的信息。
Elastic 调用 rdzv_handler.next_rendezvous() 来处理成员关系变化,在 worker 被初始化,或者重启的时候,这一函数都会被调用。其会返回 world size,store等。会把 store 配置到 workgroup 之中,后续worker 之间就可以通过这个kvstore进行沟通。
class Store(__pybind11_builtins.pybind11_object):
def add(self, arg0, arg1)
def compare_set(self, arg0, arg1, arg2)
def delete_key(self, arg0)
def get(self, arg0)
def num_keys(self)
def set(self, arg0, arg1)
def set_timeout(self, arg0)
def wait(self, *args, **kwargs)
”注册中心“可以是c10d/etcd,使用不同的“注册中心”有不同的问题
- c10d 运行在rank0 节点, 因此使用c10d时,非rank0 节点挂掉ok,rank0 节点挂掉会导致训练任务失败
- 使用etcd时,非rank0 节点挂掉ok,rank0 节点挂掉后 其它节点会作为rank0节点,可能会有问题:有些框架喜欢在rank0 做一些特殊工作
elastic agent 源码
agent 总体逻辑如下
- 调用 _initialize_workers 来启动 worker 进程
- 调用 _rendezvous,其内部:调用 next_rendezvous 处理成员关系变化,调用 _assign_worker_ranks 为 worker 建立 ranks。
- 调用 _start_workers 启动 workers。
- 调用 _monitor_workers 监控这些进程的运行结果。 代码结构上,agent 实现了启动、停止、监控worker的逻辑,将rendezvous 逻辑抽象成/委托给 RendezvousHandler负责,agent 就是 worker 管理 与 RendezvousHandler 的协作。
elastic agent 的可扩展性非常好,在 1.9.0 版本中,一共有三个 Agent,分别是 ElasticAgent、SimpleElasticAgent 和 LocalElasticAgent。 其中 ElasticAgent 是一个 Abstract Class,SimpleElasticAgent 对其中的某些函数进行了实现(半成品),而 LocalElasticAgent 则实现了管理单机上所有 worker 进程的 elastic agent。
agent运行
python3.9/site-packages/torch/distributed/run.py
==> ` main() ==>
run(args) ==>
elastic_launch(config=config,entrypoint=cmd,)(cmd_args) ==>
call(args)` ==> launch_agent ==> agent.run ==> SimpleElasticAgent._invoke_run
# /python3.9/site-packages/torch/distributed/launcher/api.py
class elastic_launch:
def __init__(self,config: LaunchConfig,entrypoint: Union[Callable, str, None],):
self._config = config
self._entrypoint = entrypoint
def __call__(self, *args):
return launch_agent(self._config, self._entrypoint, list(args))
def launch_agent(config: LaunchConfig,entrypoint: Union[Callable, str, None],args: List[Any],) -> Dict[int, Any]:
## 构造WorkerSpec 包括worker 启动参数等
entrypoint_name = _get_entrypoint_name(entrypoint, args)
rdzv_parameters = RendezvousParameters(
backend=config.rdzv_backend,
endpoint=config.rdzv_endpoint,
min_nodes=..,max_nodes=..,..)
rdzv_handler = rdzv_registry.get_rendezvous_handler(rdzv_parameters)
master_addr, master_port = _get_addr_and_port(rdzv_parameters)
try:
spec = WorkerSpec(
local_world_size=config.nproc_per_node,
entrypoint=entrypoint,...,
master_addr=master_addr,master_port=master_port,)
agent = LocalElasticAgent(spec=spec, start_method=config.start_method, log_dir=config.log_dir)
result = agent.run() # 启动agent
except Exception:
...
finally:
rdzv_handler.shutdown() # 以 EtcdRendezvousHandler 为例,EtcdRendezvousHandler.shutdown 会在 etcd 上记录 本次rendezvous closed。
初始化/启动worker
# python3.9/site-packages/torch/distributed/elastic/agent/server/api.py
class SimpleElasticAgent(ElasticAgent):
def run(self, role: str = DEFAULT_ROLE) -> RunResult:
try:
return self._invoke_run(role)
except SignalException as e:
...
finally:
self._shutdown()
def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
spec = self._worker_group.spec
self._initialize_workers(self._worker_group) ## 启动worker
while True:
time.sleep(monitor_interval) ## 每隔 monitor_interval 查看下 worker 的状态
# 监控循环
class SimpleElasticAgent(ElasticAgent):
def _initialize_workers(self, worker_group: WorkerGroup) -> None:
role = worker_group.spec.role
log.info(f"[{role}] Rendezvous'ing worker group")
self._rendezvous(worker_group)
log.info(f"[{role}] Starting worker group")
worker_ids = self._start_workers(worker_group)
for local_rank, w_id in worker_ids.items():
worker = worker_group.workers[local_rank]
worker.id = w_id
worker_group.state = WorkerState.HEALTHY
def _rendezvous(self, worker_group: WorkerGroup) -> None:
spec = worker_group.spec
store, group_rank, group_world_size = spec.rdzv_handler.next_rendezvous()
self._store = store
workers = self._assign_worker_ranks(store, group_rank, group_world_size, spec)
worker_group.workers = workers
worker_group.store = store
worker_group.group_rank = group_rank
worker_group.group_world_size = group_world_size
if group_rank == 0:
self._set_master_addr_port(store, spec.master_addr, spec.master_port)
master_addr, master_port = self._get_master_addr_port(store)
restart_count = spec.max_restarts - self._remaining_restarts
next_rendezvous 方法会调用 rendezvous_barrier。在 rendezvous_barrier 之中,如果底层抛出各种异常,则会捕获,然后调用 init_phase 再次执行一次rendezvous,直到deadline时间到为止。
监控循环
PyTorch 分布式之弹性训练(2)—启动&单节点流程进入 while True 循环,在循环之中:通过 _monitor_workers 定期轮训用户程序运行情况 SUCCEEDED/FAILED/HEALTHY,得到客户进程运行结果,然后依据情况作出判断。
- 如果程序正常结束,则返回。
- 如果程序出错,则重试,即重启所有 workers,如果重试次数达到依然有问题,就结束所有workers。
- 如果节点成员关系有变化,比如scale up就会有新的节点在waiting,这时候就重启所有workers。
class SimpleElasticAgent(ElasticAgent):
def _invoke_run(self, role: str = DEFAULT_ROLE) -> RunResult:
# NOTE: currently only works for a single role
spec = self._worker_group.spec
role = spec.role
self._initialize_workers(self._worker_group) # 启动worker
monitor_interval = spec.monitor_interval
rdzv_handler = spec.rdzv_handler
while True:
assert self._worker_group.state != WorkerState.INIT
# 定期监控
time.sleep(monitor_interval)
# 监控客户程序运行情况
run_result = self._monitor_workers(self._worker_group)
state = run_result.state # 进程运行情况
self._worker_group.state = state
if state == WorkerState.SUCCEEDED:
# 程序正常结束
self._exit_barrier() # 有一个成功了就全部结束
return run_result
elif state in {WorkerState.UNHEALTHY, WorkerState.FAILED}:
# 程序出错
if self._remaining_restarts > 0: # 重试
self._remaining_restarts -= 1
self._restart_workers(self._worker_group) # 进行重启
else:
self._stop_workers(self._worker_group) # 重试次数达到,结束workers
self._worker_group.state = WorkerState.FAILED
self._exit_barrier()
return run_result
elif state == WorkerState.HEALTHY:
# 程序正常运行
# 节点成员关系有变化,比如scale up
# membership changes do not count as retries
num_nodes_waiting = rdzv_handler.num_nodes_waiting()
group_rank = self._worker_group.group_rank
# 如果有新的节点在waiting,就重启所有workers
if num_nodes_waiting > 0:
self._restart_workers(self._worker_group)
else:
raise Exception(f"[{role}] Worker group in {state.name} state")
def _restart_workers(self, worker_group: WorkerGroup) -> None:
"""
Restarts (stops, rendezvous, starts) all local workers in the group.
"""
role = worker_group.spec.role
log.info(f"[{role}] Stopping worker group")
self._stop_workers(worker_group)
worker_group.state = WorkerState.STOPPED
self._initialize_workers(worker_group) # 又执行了一次 _initialize_workers,重新一轮 rendezvous 。
启动worker
PyTorch 分布式之弹性训练(3)—代理_start_workers 方法会调用 start_processes 来启动 worker 进程,默认_start_method 是 “spawn”。也就是启动了多个进程,并行执行用户程序。同时这些进程的运行结果会被监控。start_processes 参数之中,entrypoint和args 是用户命令和参数,entrypoint可以是函数或者字符串。_start_workers 把 start_processes 方法启动多线程的结果保存在 _pcontext 之中,后续就用 _pcontext 来继续控制,比如结束 worker 就是直接调用 _pcontext 的 close方法。
class SimpleElasticAgent(ElasticAgent):
def _initialize_workers(self, worker_group: WorkerGroup) -> None:
role = worker_group.spec.role
self._rendezvous(worker_group) # 启动前先集会获取下数据
log.info(f"[{role}] Starting worker group")
worker_ids = self._start_workers(worker_group) # 启动worker
for local_rank, w_id in worker_ids.items():
worker = worker_group.workers[local_rank]
worker.id = w_id
worker_group.state = WorkerState.HEALTHY
LocalElasticAgent 实现了 _start_workers
,关键就是为 worker 准备环境 变量和参数
# python3.9/site-packages/torch/distributed/elastic/agent/server/local_elastic_agent.py
class LocalElasticAgent(SimpleElasticAgent):
def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
spec = worker_group.spec
store = worker_group.store
master_addr, master_port = super()._get_master_addr_port(store)
restart_count = spec.max_restarts - self._remaining_restarts
args: Dict[int, Tuple] = {}
envs: Dict[int, Dict[str, str]] = {}
## 为worker 进程准备 环境变量 和启动参数
for worker in worker_group.workers:
local_rank = worker.local_rank
worker_env = {
"LOCAL_RANK": str(local_rank),"RANK": str(worker.global_rank),"GROUP_RANK": str(worker_group.group_rank),
"LOCAL_WORLD_SIZE": str(spec.local_world_size),"WORLD_SIZE": str(worker.world_size),
"MASTER_ADDR": master_addr,"MASTER_PORT": str(master_port),
...
}
envs[local_rank] = worker_env
worker_args = list(spec.args)
worker_args = macros.substitute(worker_args, str(local_rank))
args[local_rank] = tuple(worker_args)
self._pcontext = start_processes(
entrypoint=spec.entrypoint,
args=args,envs=envs,log_dir=attempt_log_dir,
start_method=self._start_method,...)
return self._pcontext.pids()
PContext 就是一个抽象类,有两个派生类:MultiprocessContext 和 SubprocessContext。前文提到,start_processes 参数之中,entrypoint和args 是用户命令和参数,entrypoint可以是函数或者字符串。如果entrypoint是函数,则使用MultiprocessContext。如果是字符串类型,使用SubprocessContext。
- LocalElasticAgent._pcontext 保存了 MultiprocessContext,MultiprocessContext._pc 保存了 ProcessContext。
- 监控时候,LocalElasticAgent._monitor_workers 调用了 MultiprocessContext.wait,MultiprocessContext 又调用了 ProcessContext.join,ProcessContext.join 具体监控进程的运行状态,这样完成了监控的整体逻辑。
- 子进程有变化或者超时之后,ProcessContext.join 返回了进程结果,MultiprocessContext.wait 把进程结果转发回去,_monitor_workers 把进程结果转换为 WorkerState.SUCCEEDED 或者 WorkerState.FAILED。
容错
PyTorch 分布式之弹性训练(6)—监控/容错新一轮 rendezvous 会让其他 agent 也重启它们的worker。这是如何做到的?具体如下:
- Agent 0(故障Agent)通过 monitoring 发现了故障。
- Agent 0 调用 _restart_workers 重启worker。
- Agent 0 会调用 next_rendezvous 发起新一轮 rendezvous。
- Agent 0 在做任何操作之前,比如 keep alive 操作之前,会调用 sync 来从kvstore获取集群信息,这样可以保证 Agent拿到的是集群最新状态。
- Agent 0 会把自己加入到本地的 waiting_list 之中。
- Agent 0 同时会调用 mark_dirty,意思是我状态更新了,需要写入KVStore。
- Agent 0 会调用sync把自己的waiting_list 被写入 KVStore。
- Agent 1(其他正常工作的 agent)会在做任何操作之前,比如 keep alive 操作之前,会调用 sync 操作从KVStore 获取最新信息。
- Agent 1 利用这些信息来更新自己的状态,这样本地 waiting_list 就会更新。
- Agent 1 的 train loop 在每 30 秒监控之后,因为系统正常,是 Healthy 状态。
- Agent 1 所以调用 num_nodes_waiting() 看看 waiting_list 数目。
- Agent 1 会获取本地 waiting list 的数目。
- 如果 waiting list 不为空,也调用_restart_workers。
- 其最终会调用next_rendezvous。
- 启动一个新 worker。
- 调用 next_rendezvous,发起新一轮 rendezvous。
- _RendezvousJoinOp 内部运行,生成 ADD_TO_WAIT_LIST。
- executor.run 内部运行 _add_to_wait_list。
- 往 wait_list 添加一个新的 node。
- Agent 之中,定期(比如 30S)运行一次 _monitor_workers,获取worker 子进程状态。
- 如果是 HEALTHY,则调用num_nodes_waiting 获取 wait_list 个数。
- 如果 wait_list 之中等待节点数目大于 0,则:
- 调用 _restart_workers 重启进程组。
节点级容错: elastic agent 注册了 signal.signal(signal.SIGTERM, _terminate_process_handler)
之后 引起 rdzv_handler.shutdown()
与horovod 对比
其它
run.py 启动命令
python -m torch.distributed.run [args] training_script [training_script_args]
,包含以下参数
关于workGroup 的布局
--nnodes
--nproc_per_node
关于集会机制--rdzv_backend
- The backend of the rendezvous (e.g.c10d
). This is typically a strongly consistent key-value store.--rdzv_endpoint
- The rendezvous backend endpoint; usually in form<host>:<port>
.--rdzv_id
- A user-defined id that uniquely identifies the worker group for a job. This id is used by each node to join as a member of a particular worker group.--rdzv_conf
--standalone
Standalone 模式是分布式模式的一种特例,它主要针对单机多 Worker 的方式提供了一些便利的设置,指定Standalone后 不再需要设置一些多余的参数如 rdzv_backend 和 rdzv_endpoint 等。 User-code launch related arguments.--max_restarts
--monitor_interval
监听 worker 状态的间隔--start_method
可选值["spawn", "fork", "forkserver"]
,默认值spawn-
--role
以下为兼容 老版 launch.py 提供的参数 --node_rank
--master_addr
--master_port
如何启动 train_script
train_script 启动方式有以下几种
- 直接 python 启动 比较重要的参数是local_rank,world_size,master_host,master_port, backend
- WORLD_SIZE, 数据分几份
- RANK, 自己是第几份,自己是不是master(rank=0 是master)
- NPROC_PER_NODE,
RANK / NPROC_PER_NODE
可以推算 多卡里用哪一张卡 - MASTER_ADDR:MASTER_PORT, 如何发现其它成员,组建process_group时需要
- 由 launch.py 启动,大部分参数直接 传给 train_script.py,launch根据 nnodes 和 nproc_per_node 算了一下 local_rank 和 world_size 传给train_script。
python -m torch.distributed.launch --nnodes=2 --node_rank=xx --nproc_per_node=xx --master_addr=xx --master_port=xx TRAINING_SCRIPT.py (... train script args ...)
。 这个方法已经淘汰,并且在不同的时期 launch.py 的实现有变化。 - 由 run.py 启动,参数 都是给 rendezvous 用的, 由rendezvous 协商出 train_script 需要的参数,由 run.py spawn worker 进程时传给worker/传给train_script。
python -m torchelastic.distributed.run --nnodes=1:4 --nproc_per_node=xx --rdzv_id=JOB_ID --rdzv_backend=etcd --rdzv_endpoint=ETCD_HOST:ETCD_PORT TRAINING_SCRIPT.py (... train script args ...)
launch 和 run 仅仅是为启动 train_script 方便,该给 train_script 传的参数还是要传,不传参数用环境变量也行,传参或传环境变量需要的参数是一致的,可以到launch.py 或 run.py 源码下查看其启动需要哪些参数。PS:train_script 是算法写的,在train_script 内使用的库 具备分布式、弹性 能力之前,能力扩充 主要通过 加壳子的方式来解决。
python直接 启动 train_script 需要以下参数(来自run.py 代码注释)
LOCAL_RANK
- The local rank.RANK
- The global rank.GROUP_RANK
- The rank of the worker group. A number between 0 andmax_nnodes
. When running a single worker group per node, this is the rank of the node.ROLE_RANK
- The rank of the worker across all the workers that have the same role. The role of the worker is specified in theWorkerSpec
.LOCAL_WORLD_SIZE
- The local world size (e.g. number of workers running locally); equals to--nproc_per_node
specified ontorchrun
.WORLD_SIZE
- The world size (total number of workers in the job).ROLE_WORLD_SIZE
- The total number of workers that was launched with the same role specified inWorkerSpec
.MASTER_ADDR
- The FQDN of the host that is running worker with rank 0; used to initialize the Torch Distributed backend.MASTER_PORT
- The port on theMASTER_ADDR
that can be used to host the C10d TCP store.TORCHELASTIC_RESTART_COUNT
- The number of worker group restarts so far.TORCHELASTIC_MAX_RESTARTS
- The configured maximum number of restarts.TORCHELASTIC_RUN_ID
- Equal to the rendezvousrun_id
(e.g. unique job id).