每次迭代中,每个进程具有自己的
optimizer,并独立完成所有的优化步骤,进程内与一般的训练相一致。每个进程对应一个独立的训练过程,只有对梯度等少量数据进行信息交换。在
nn.DataParallel 中, 全程维护一个 optimizer,然后梯度求和,然后在主gpu上进行参数更新,再将更新后的参数广播到其他gpu上。比较而言,前者传输数据量更少,因此速度更快,效率更高在使用多进程时,每个进程有自己计算得到的loss,记录数据时 希望对不同进程上的loss取平均,其它数据也是想要平均。这时需要用到api如下,详细的参见源码
def all_reduce(tensor, op=ReduceOp.SUM, group=group.WORLD, async_op=False):
    """
    Reduces the tensor data across all machines in such a way that all get
    the final result.
    """
由于使用 DDP 后,模型在每个GPU上都复制了一份,同时被封装了一层。所以保存模型时只需要保存 master 节点的模型,并将平时的
model 变成 model.module,具体如下:if dist.get_rank()==0:
    torch.save(model.module.state_dict(), "{}.ckpt".format(str(epoch)))
if dist.get_rank() == 0 and ckpt_path is not None:
    model.load_state_dict(torch.load(ckpt_path))
补充
- 每个进程包含独立的解释器和GIL
 一般使用的Python解释器 CPython:是用C语言实现Python,是目前应用最广泛的解释器。全局锁使Python在多线程效果升表现不佳。全局解释器锁(Global Interpreter Lock)是Python用于同步线程的工具,使得任何时刻仅有一个线程在执行。
 由于每个进程拥有独立的解释器和GIL,消除了来自单个Python进程中的多个执行线程,模型副本或GPU的额外解释器开销、线程颠簸,因此可以减少解释器和GIL使用冲突。这对于严重依赖 Python runtime 的 models 而言,比如说包含 RNN 层或大量小组件的 models 而言,这尤为重要。
使用方式----代码编写
import os
import argparse
import torch
import torch.nn as nn
import torch.distributed as dist
def parse():
   parser = argparse.ArgumentParser()
   parser.add_argument('--local_rank', type=int, default=0)
   args = parser.parse_args()
   return args
   
def reduce_tensor(tensor):
   rt = tensor.clone()
   dist.all_reduce(rt, op=dist.reduce_op.SUM)
   rt /= dist.get_world_size()
   return rt
def record_loss(loss):
   reduced_loss = reduce_tensor(loss.data)
   train_epoch_loss += reduced_loss.item()
   # 注意在写入TensorBoard的时候只让一个进程写入就够了:
   # TensorBoard
   if args.local_rank == 0:
       writer.add_scalars('Loss/training', {'train_loss': train_epoch_loss,
                                                                            'val_loss': val_epoch_loss}, epoch + 1)
def main():
   """=============================================================
   在启动器启动python脚本后,会通过参数 local_rank 来告诉当前进程使用的是哪个GPU,
   用于在每个进程中指定不同的device
   ================================================================"""
   args = parse()
   torch.cuda.set_device(args.local_rank)
   dist.init_process_group(
   'nccl',                                               # 初始化GPU通信方式(NCCL)
   init_menthod='env://'              # 参数的获取方式(env代表通过环境变量)
   )
   """=============================================================
   分布式数据读取,具体使用方式, 参考 https://blog.csdn.net/magic_ll/article/details/123294552
   ================================================================"""
   train_dataset = ...
   train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
   train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
   """=======分布式模型的调用:包括SynBN========================================"""
   model = ...
   model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
   model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank])
   optimizer = optim.SGD(model.parameters())
   """=======训练===================================================="""
   for epoch in range(100):
       train_sampler.set_epoch(epoch)
       for batch_idx, (data, target) in enumerate(train_loader):
           images = images.cuda(non_blocking=True)
           target = target.cuda(non_blocking=True)
           ...
           output = model(images)
           loss = criterion(output, target)
           ...
           optimizer.zero_grad()
           loss.backward()
           optimizer.step()
           record_loss(loss)
代码的启动方式
- 在多进程的启动方面,不用自己手写 multiprocess 进行一系列复杂的CPU/GPU分配任务,PyTorch提供了一个很方便的启动器 torch.distributed.launch 用于启动文件,故运行训练代码的方式如下:
 CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py










