分布式通讯包 - torch.distributed
torch.distributed
提供了一种类似MPI的接口,用于跨多机器网络交换张量数据。它支持几种不同的后端和初始化方法。
目前,torch.distributed
支持三个后端,每个后端具有不同的功能。下表显示哪些功能可用于CPU / CUDA张量。只有当用于构建PyTorch的实现支持它时,MPI才支持cuda。
后端 | tcp |
gloo |
mpi |
|||
---|---|---|---|---|---|---|
设备 | 中央处理器 | GPU | 中央处理器 | GPU | 中央处理器 | GPU |
--- | --- | --- | --- | --- | --- | --- |
发送 | ✓ | ✘ | ✘ | ✘ | ✓ | ? |
的recv | ✓ | ✘ | ✘ | ✘ | ✓ | ? |
广播 | ✓ | ✘ | ✓ | ✓ | ✓ | ? |
all_reduce | ✓ | ✘ | ✓ | ✓ | ✓ | ? |
减少 | ✓ | ✘ | ✘ | ✘ | ✓ | ? |
all_gather | ✓ | ✘ | ✘ | ✘ | ✓ | ? |
收集 | ✓ | ✘ | ✘ | ✘ | ✓ | ? |
分散 | ✓ | ✘ | ✘ | ✘ | ✓ | ? |
屏障 | ✓ | ✘ | ✓ | ✓ | ✓ | ? |
基本
所述torch.distributed
包提供跨在一个或多个计算机上运行的几个计算节点对多进程并行PyTorch支持与通信原语。该类torch.nn.parallel.DistributedDataParallel()基于此功能,提供同步分布式培训作为围绕任何PyTorch模型的包装器。这不同于所提供的类型的并行的 :模块:torch.multiprocessing
和torch.nn.DataParallel()在它支持多个网络连接的机器和在用户必须明确地启动主训练脚本的单独副本为每个进程。
在单机同步情况下,torch.distributed或 torch.nn.parallel.DistributedDataParallel()wrapper可能仍然具有优于数据并行性的其他方法的优点,包括torch.nn.DataParallel():
- 每个进程维护自己的优化器,并且每次迭代执行完整的优化步骤。虽然这可能是冗余的,因为梯度已经被聚集在一起,并且在进程之间进行平均,因此对于每个进程是相同的,这意味着不需要参数广播步骤,从而减少在节点之间传送张量所花费的时间。
- 每个进程都包含一个独立的Python解释器,消除了来自单个Python进程中驱动多个执行线程,模型副本或GPU的额外的解释器开销和“GIL-thrashing”。这对于大量使用Python运行时的模型尤其重要,包括具有循环层或多个小组件的模型。
初始化
torch.distributed.init_process_group() 在调用任何其他方法之前,需要使用该函数初始化该包。这将阻止所有进程加入。
torch.distributed.init_process_group(backend, init_method='env://', kwargs)
初始化分布式包。
参数:
- backend (str) - 要使用的后端的名称。根据构建时配置有效值包括:
tcp
,mpi
和gloo
。 - initmethod([_str](https://docs.python.org/2/library/functions.html#str)_,__可选_) - 指定如何初始化包的URL。
- worldsize([_int](https://docs.python.org/2/library/functions.html#int)_,__可选_) - 参与作业的进程数。
- rank(int,__可选) - 当前进程的等级。
- groupname([_str](https://docs.python.org/2/library/functions.html#str)_,__可选_) - 组名称。请参阅init方法的说明。
torch.distributed.get_rank()
返回当前进程的排名。
Rank是分配给分布式组中每个进程的唯一标识符。它们总是连续的整数,范围从0到world_size
。
torch.distributed.get_world_size()
返回分布式组中的进程数。
目前支持三种初始化方式:
TCP初始化
有两种方法来初始化使用TCP,这两种方法都需要可以从所有进程访问的网络地址和所需的world_size。第一种方法需要指定属于等级0进程的地址。第一种初始化方式要求所有进程都具有手动指定的等级。
或者,地址必须是有效的IP多播地址,在这种情况下可以自动分配等级。组播初始化还支持一个group_name参数,只要使用不同的组名,就可以为多个作业使用相同的地址。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
# or a multicast address - rank will be assigned automatically if unspecified
dist.init_process_group(init_method='tcp://[ff15:1e18:5d4c:4cf0:d02d:b659:53ba:b0a7]:23456',
world_size=4)
共享文件系统初始化
另一个初始化方法利用一个文件系统,该文件系统可以从组中的所有计算机共享和可见,以及所需的world_size。该URL应file://以共享文件系统上的不存在的文件(在现有目录中)开头并包含一个路径。此初始化方法也支持一个 group_name参数,只要使用不同的组名,就可以使用相同的共享文件路径进行多个作业。
警告
该方法假设文件系统支持使用fcntl大多数本地系统进行锁定,NFS支持它。
import torch.distributed as dist
# Rank will be assigned automatically if unspecified
dist.init_process_group(init_method='file:///mnt/nfs/sharedfile', world_size=4,
group_name=args.group)
环境变量初始化
该方法将从环境变量中读取配置,从而可以完全自定义获取信息的方式。要设置的变量是:
- MASTER_PORT - 必需 必须是排名0的机器上的免费端口
- MASTER_ADDR - 必需(等级0除外); 地址0级节点
- WORLD_SIZE - 必需 可以在这里设置,也可以调用init函数
- RANK - 必需 可以在这里设置,也可以调用init函数
等级为0的机器将用于设置所有连接。
这是默认方法,这意味着init_method不必指定(或可以env://)。
Groups组
默认集合在默认组(也称为世界)上运行,并要求所有进程进入分布式函数调用。然而,一些工作负载可以从更细粒度的通信中受益。这就是分布式组织发挥作用的地方。new_group()函数可用于创建新组,具有所有进程的任意子集。它返回一个不透明的组句柄,可以作为group参数给予所有集合(集合是分布式函数以在某些众所周知的编程模式中交换信息)。
torch.distributed.new_group(_rank = None)
创建一个新的分布式组。
此功能要求主组中的所有进程(即作为分布式作业的一部分的所有进程)都将输入此功能,即使它们不会是组的成员。此外,应在所有进程中以相同的顺序创建组。
参数:
- ranks (list[int]) - 团体成员名单。 返回: 分配组的句柄可以被赋予集体调用。
点对点通信
torch.distributed.send(tensor, dst)
同步发送张量。
参数:
torch.distributed.recv(tensor, src=None)
同步接收张量。
参数:
isend()并irecv() 在使用时返回分布式请求对象。一般来说,此对象的类型是未指定的,因为它们不应该手动创建,而是保证支持两种方法:
- is_completed() - 如果操作完成,返回True
- wait() - 将阻止进程,直到操作完成。 is_completed()保证在返回True后返回True。
torch.distributed.isend(tensor, dst)
以异步方式发送张量。
参数:
返回:
分布式请求对象。
torch.distributed.irecv(tensor, src)
以异步方式接收张量。
参数:
返回: 分布式请求对象。
集体功能
torch.distributed.broadcast(tensor, src, group=<object object="">)</object>
向整个组播放张量。
tensor
在参与集体的所有过程中必须具有相同数量的元素。
参数:
torch.distributed.all_reduce(tensor, op=<object object="">, group=<object object="">)</object></object>
减少所有机器上的张量数据,使得所有机器都得到最终结果。
tensor
在所有进程中,呼叫将是相同的。
参数:
- 张量(张量) - 集体的输入和输出。该功能就地运行。
- op(可选) -
torch.distributed.reduce_op
枚举中的一个值。指定用于元素方式减少的操作。 - 集团(可选) - 集体集团。
torch.distributed.reduce(tensor, dst, op=<object object="">, group=<object object="">)</object></object>
减少所有机器的张量数据。
只有排名过程dst
才会得到最终结果。
参数:
- 张量(张量) - 集体的输入和输出。该功能就地运行。
- op(可选) -
torch.distributed.reduce_op
枚举中的一个值。指定用于元素方式减少的操作。 - 集团(可选) - 集体集团。
torch.distributed.all_gather(tensor_list, tensor, group=<object object="">)</object>
从列表中收集整个组的张量。
参数:
torch.distributed.gather(tensor, kwargs)
在单个过程中收集张量列表。
参数:
- 张量(Tensor) - 输入张量。
- dst(int) - 目的地排名。除了接收数据之外的所有进程都需要。
- gatherlist(list _[ Tensor ]) - 用于接收数据的适当大小的张量的列表。仅在接收过程中需要。
- 集团(可选) - 集体集团。
torch.distributed.scatter(tensor, kwargs)
向组中的所有进程散布张量列表。
每个进程将只收到一个张量,并将其数据存储在 tensor
参数中。
参数:
- 张量(张量) - 输出张量。
- src(int) - 源排名。除了发送数据之外的所有进程都需要。
- scatterlist(list _[ Tensor ]) - 要分散的张量列表。只有在发送数据的过程中才需要。
- 集团(可选) - 集体集团。
torch.distributed.barrier(group=<object object="">)</object>
同步所有进程。
这个集体阻止进程,直到整个组进入这个功能。
参数: group (可选) - Group of the collective.
译者署名
用户名 | 头像 | 职能 | 签名 |
---|---|---|---|
Song | 翻译 | 人生总要追求点什么 |