深度学习分布式训练collective通信操作及pytorch示例(代码片段)

BQW_ BQW_     2023-04-02     708

关键词:

相关博客
【深度学习】【分布式训练】Collective通信操作及Pytorch示例
【自然语言处理】【大模型】大语言模型BLOOM推理工具测试
【自然语言处理】【大模型】GLM-130B:一个开源双语预训练语言模型
【自然语言处理】【大模型】用于大型Transformer的8-bit矩阵乘法介绍
【自然语言处理】【大模型】BLOOM:一个176B参数且可开放获取的多语言模型

Collective通信操作及Pytorch示例

​ 大模型时代,单机已经无法完成先进模型的训练和推理,分布式训练和推理将会是必然的选择。各类分布式训练和推断工具都会使用到Collective通信。网络上大多数的教程仅简单介绍这些操作的原理,没有代码示例来辅助理解。本文会介绍各类Collective通信操作,并展示pytorch中如何使用

一、Collective通信操作

1. AllReduce

​ 将各个显卡的张量进行聚合(sum、min、max)后,再将结果写回至各个显卡。

2. Broadcast

​ 将张量从某张卡广播至所有卡。

3. Reduce

​ 执行同AllReduce相同的操作,但结果仅写入具有的某个显卡。

4. AllGather

​ 每个显卡上有一个大小为N的张量,共有k个显卡。经过AllGather后将所有显卡上的张量合并为一个 N × k N\\times k N×k的张量,然后将结果分配至所有显卡上。

5. ReduceScatter

​ 执行Reduce相同的操作,但是结果会被分散至不同的显卡。

二、Pytorch示例

​ pytorch的分布式包torch.distributed能够方便的实现跨进程和跨机器集群的并行计算。本文代码运行在单机双卡服务器上,并基于下面的模板来执行不同的分布式操作。

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def init_process(rank, size, fn, backend='nccl'):
    """
    为每个进程初始化分布式环境,保证相互之间可以通信,并调用函数fn。
    """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
    
def run(world_size, func):
    """
    启动world_size个进程,并执行函数func。
    """
    processes = []
    mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, func))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
        
if __name__ == "__main__":
    run(2, func) # 这里的func随后会被替换为不同的分布式示例函数
    pass

​ 先对上面的模板做一些简单的介绍。

  • 函数run会根据传入的参数world_size,生成对应数量的进程。每个进程都会调用init_process来初始化分布式环境,并调用传入的分布式示例函数。
  • torch.distributed.init_process_group(),该方法负责各进程之间的初始协调,保证各进程都会与master进行握手。该方法在调用完成之前会一直阻塞,并且后续的所有操作都必须在该操作之后。调用该方法时需要初始化下面的4个环境变量:
    • MASTER_PORT:rank 0进程所在机器上的空闲端口;
    • MASTER_ADDR:rank 0进程所在机器上的IP地址;
    • WORLD_SIZE:进程总数;
    • RANK:每个进程的RANK,所以每个进程知道其是否是master;

1. 点对点通信

​ 在介绍其他collective通信之前,先看一个简单的点对点通信实现。

def p2p_block_func(rank, size):
    """
    将rank src上的tensor发送至rank dst(阻塞)。
    """
    src = 0
    dst = 1
    group = dist.new_group(list(range(size)))
    # 对于rank src,该tensor用于发送
    # 对于rank dst,该tensor用于接收
    tensor = torch.zeros(1).to(torch.device("cuda", rank))
    if rank == src:
        tensor += 1
        # 发送tensor([1.])
        # group指定了该操作所见进程的范围,默认情况下是整个world
        dist.send(tensor=tensor, dst=1, group=group)
    elif rank == dst:
        # rank dst的tensor初始化为tensor([0.]),但接收后为tensor([1.])
        dist.recv(tensor=tensor, src=0, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, p2p_block_func)

p2p_block_func实现从rank 0发送一个tensor([1.0])至rank 1,该操作在发送完成/接收完成之前都会阻塞。

​ 下面是一个不阻塞的版本:

def p2p_unblock_func(rank, size):
    """
    将rank src上的tensor发送至rank dst(非阻塞)。
    """
    src = 0
    dst = 1
    group = dist.new_group(list(range(size)))
    tensor = torch.zeros(1).to(torch.device("cuda", rank))
    if rank == src:
        tensor += 1
        # 非阻塞发送
        req = dist.isend(tensor=tensor, dst=dst, group=group)
        print("Rank 0 started sending")
    elif rank == dst:
        # 非阻塞接收
        req = dist.irecv(tensor=tensor, src=src, group=group)
        print("Rank 1 started receiving")
    req.wait()
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, p2p_unblock_func)

p2p_unblock_func是非阻塞版本的点对点通信。使用非阻塞方法时,因为不知道数据何时送达,所以在req.wait()完成之前不要对发送/接收的tensor进行任何操作。

2. Broadcast

def broadcast_func(rank, size):
    src = 0
    group = dist.new_group(list(range(size)))
    if rank == src:
        # 对于rank src,初始化tensor([1.])
        tensor = torch.zeros(1).to(torch.device("cuda", rank)) + 1
    else:
        # 对于非rank src,初始化tensor([0.])
        tensor = torch.zeros(1).to(torch.device("cuda", rank))
    # 对于rank src,broadcast是发送;否则,则是接收
    dist.broadcast(tensor=tensor, src=0, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, broadcast_func)

broadcast_func会将rank 0上的tensor([1.])广播至所有的rank上。

3. Reduce与Allreduce

def reduce_func(rank, size):
    dst = 1
    group = dist.new_group(list(range(size)))
    tensor = torch.ones(1).to(torch.device("cuda", rank))
    # 对于所有rank都会发送, 但仅有dst会接收求和的结果
    dist.reduce(tensor, dst=dst, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, reduce_func)

reduce_func会对group中所有rank的tensor进行聚合,并将结果发送至rank dst。

def allreduce_func(rank, size):
    group = dist.new_group(list(range(size)))
    tensor = torch.ones(1).to(torch.device("cuda", rank))
    # tensor即用来发送,也用来接收
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, allreduce_func)

allreduce_func将group中所有rank的tensor进行聚合,并将结果发送至group中的所有rank。

4. Gather与Allgather

def gather_func(rank, size):
    dst = 1
    group = dist.new_group(list(range(size)))
    # 该tensor用于发送
    tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
    gather_list = []
    if rank == dst:
        # gather_list中的tensor数量应该是size个,用于接收其他rank发送来的tensor
        gather_list = [torch.zeros(1).to(torch.device("cuda", dst)) for _ in range(size)]
        # 仅在rank dst上需要指定gather_list
        dist.gather(tensor, gather_list=gather_list, dst=dst, group=group)
    else:
        # 非rank dst,相当于发送tensor
        dist.gather(tensor, dst=dst, group=group)
    print('Rank ', rank, ' has data ', gather_list)
    
if __name__ == "__main__":
    run(2, gather_func)

gather_func从group中所有rank上收集tensor,并发送至rank dst。(相当于不进行聚合操作的reduce)

def allgather_func(rank, size):
    group = dist.new_group(list(range(size)))
    # 该tensor用于发送
    tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
    # gether_list用于接收各个rank发送来的tensor
    gather_list = [torch.zeros(1).to(torch.device("cuda", rank)) for _ in range(size)]
    dist.all_gather(gather_list, tensor, group=group)
    # 各个rank的gather_list均一致
    print('Rank ', rank, ' has data ', gather_list)
    
if __name__ == "__main__":
    run(2, allgather_func)

allgather_func从group中所有rank上收集tensor,并将收集到的tensor发送至所有group中的rank。

5. Scatter与ReduceScatter

def scatter_func(rank, size):
    src = 0
    group = dist.new_group(list(range(size)))
    # 各个rank用于接收的tensor
    tensor = torch.empty(1).to(torch.device("cuda", rank))
    if rank == src:
        # 在rank src上,将tensor_list中的tensor分发至不同的rank上
        # tensor_list:[tensor([1.]), tensor([2.])]
        tensor_list = [torch.tensor([i + 1], dtype=torch.float32).to(torch.device("cuda", rank)) for i in range(size)]
        # 将tensor_list发送至各个rank
        # 接收属于rank src的那部分tensor
        dist.scatter(tensor, scatter_list=tensor_list, src=0, group=group)
    else:
        # 接收属于对应rank的tensor
        dist.scatter(tensor, scatter_list=[], src=0, group=group)
    # 每个rank都拥有tensor_list中的一部分tensor
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, scatter_func)

scatter_func会将rank src中的一组tensor逐个分发至其他rank上,每个rank持有的tensor不同。

def reduce_scatter_func(rank, size):
    group = dist.new_group(list(range(size)))
    # 用于接收的tensor
    tensor = torch.empty(1).to(torch.device("cuda", rank))
    # 用于发送的tensor列表
    # 对于每个rank,有tensor_list=[tensor([0.]), tensor([1.])]
    tensor_list = [torch.Tensor([i]).to(torch.device("cuda", rank)) for i in range(size)]
    # step1. 经过reduce的操作会得到tensor列表[tensor([0.]), tensor([2.])]
    # step2. tensor列表[tensor([0.]), tensor([2.])]分发至各个rank
    # rank 0得到tensor([0.]),rank 1得到tensor([2.])
    dist.reduce_scatter(tensor, tensor_list, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, reduce_scatter_func)

参考资料

https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/collectives.html

https://pytorch.org/tutorials/intermediate/dist_tuto.html#collective-communication

https://pytorch.org/docs/stable/distributed.html#collective-functions

深度学习分布式策略优化显存优化通信优化编译优化综述

综述因为我个人最近在从事可能是AI领域对性能挑战最大的方向,自动驾驶领域,所以对整个深度学习训练的优化尤为关注,最近一直在学习相关内容,谨以此篇文章做一个总结。我一直很看好深度学习训练优化... 查看详情

深度学习分布式策略优化显存优化通信优化编译优化综述

...尝试对常见的优化技术做一个概述。具体结构如下:分布式策略优化前面介绍了在大模型时代,算力往往会成为短板,那 查看详情

深度学习分布式策略优化显存优化通信优化编译优化综述

...尝试对常见的优化技术做一个概述。具体结构如下:分布式策略优化前面介绍了在大模型时代,算力往往会成为短板,那 查看详情

深度学习中的分布式训练

1.为什么需要分布式训练随着人工智能与深度学习的发展,大规模和超大规模的模型越来越受到业界的推崇。以NLP行业为例,从最开始的Bert-base只有1亿左右的参数量,到千亿级别的GPT-3,再到今年6月发布的目前全球最大预训练模... 查看详情

数据并行:提升训练吞吐的高效方法|深度学习分布式训练专题

数据并行是大规模深度学习训练中非常成熟和常用的并行模式。本文将介绍数据并行的原理和主流实现方案,使用数据并行加速训练过程中需要注意的问题,以及如何优化数据并行进一步提高训练速度。希望能帮助用户... 查看详情

深度学习分布式训练小结

分布式训练本质上是为了加快模型的训练速度,面对较为复杂的深度学习模型以及大量的数据。单机单GPU很难在有限的时间内达成模型的收敛。这时候就需要用到分布式训练。分布式训练又分为模型并行和数据并行两大类。1... 查看详情

[源码解析]深度学习分布式训练框架horovod---后台线程架构(代码片段)

Horovod是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。本文是系列第六篇,看看Horovod后台线程架构。[源码解析]深度学习分布式训练框架horovod(6)---后台线程架构目录[源码解析]深度学习分布... 查看详情

深度学习:batch_size和学习率及如何调整(代码片段)

...法则是,如果batchsize加倍,那么学习率就加倍。分布式训练时的batch_size:需要将batch_size/num_proces 查看详情

深度学习(五十五)tensorflow分布式训练

tensorflow分布式训练博客:http://blog.csdn.net/hjimce微博:黄锦池-hjimce   qq:1393852684情况一、单机单卡单机单卡是最普通的情况,当然也是最简单的,示例代码如下:#coding=utf-8#单机单卡#对于单机单卡,可以把参数和计算都... 查看详情

(1.8)深度学习实战——深度学习模型训练痛点及解决方法(代码片段)

...I领域,学习了手写字识别等几个demo后,就会发现深度学习模型训练是十分关键和有挑战性的。选定了网络结构后,深度学习训练过程基本大同小异,一般分为如下几个步骤定义算法公式,也就是神经网络的前... 查看详情

ctr预估中,可以有哪些深度模型,深度方法?

...据量显著上升,目前工业界的大多数场景都需要使用分布式的方式进行模型训练。今天来跟大家聊聊Tensorflow、Pytorch分布式训练的底层实现逻辑。有的算法同学可能会想,我只要深入研究模型就可以了,为什么还要了... 查看详情

如何用alluxio加速云上深度学习训练?

...快速度并且降低成本,许多企业开始逐步在云上实施分布式训练的方案,本期内容将结合阿里、微软等实际应用案例,分享如何通过Alluxio加速云上深度学习。内容主要围绕两个部分展开:内容概要:Alluxio及其 查看详情

深度学习——l0l1及l2范数

在深度学习中,监督类学习问题其实就是在规则化参数同时最小化误差。最小化误差目的是让模型拟合训练数据,而规则化参数的目的是防止模型过分拟合训练数据。参数太多,会导致模型复杂度上升,容易过拟合,也就是训练... 查看详情

深度学习框架量化感知训练的思考及oneflow的解决方案(代码片段)

...GiantPandaCV0x0.总览相信不少小伙伴都了解或者使用了一些深度学习框架比如PyTorch,TensorFlow,OneFlow(也是笔者目前正在参与开发的)。但当大家使用深度学习框架的训练量化方案时如果第一感觉是太复杂了,那... 查看详情

分类|深度学习(李宏毅)

一、生成模型假设有两个类别(C_1、C_2),(P(C_1|x)=fracP(x|C_1)P(C_1)P(x|C_1)P(C_1)+P(x|C_2)P(C_2))其中(P(C_1)、P(C_2))为先验分布,(P(x|C_1)、P(x|C_2))都是高斯分布,服从以下分布:[f_mu,Sigma(x)=fra 查看详情

浅谈gpu虚拟化和分布式深度学习框架的异同

撰文|袁进辉经常有人来问我:GPU虚拟化和分布式深度学习框架的异同,以及是不是用GPU虚拟化技术也可以解决现在超大规模深度学习模型的分布式训练难题。 这次不妨把我的观点简要总结并分享出来,只想知道结论... 查看详情

深度学习网络结构中超参数momentum了解

      训练网络时,通常先对网络的初始权值按照某种分布进行初始化,如:高斯分布。初始化权值操作对最终网络的性能影响比较大,合适的网络初始权值能够使得损失函数在训练过程中的收敛速度更快,... 查看详情

深度学习框架量化感知训练的思考及oneflow的一种解决方案(代码片段)

【GiantPandaCV导语】这篇文章分享的是笔者最近在OneFlow做的一个项目,将PytorchFX移植到OneFlow之后实现了自动量化感知训练动态图模型(在Pytorch和OneFlow中都称为nn.Module)。现在用户可以在自己构建的nn.Module基础上,修改很少... 查看详情