网易首页 > 网易号 > 正文 申请入驻

PyTorch 并行训练 DistributedDataParallel完整代码示例

0
分享至

使用大型数据集训练大型深度神经网络 (DNN) 的问题是深度学习领域的主要挑战。 随着 DNN 和数据集规模的增加,训练这些模型的计算和内存需求也会增加。 这使得在计算资源有限的单台机器上训练这些模型变得困难甚至不可能。 使用大型数据集训练大型 DNN 的一些主要挑战包括:

  • 训练时间长:训练过程可能需要数周甚至数月才能完成,具体取决于模型的复杂性和数据集的大小。
  • 内存限制:大型 DNN 可能需要大量内存来存储训练期间的所有模型参数、梯度和中间激活。 这可能会导致内存不足错误并限制可在单台机器上训练的模型的大小。

为了应对这些挑战,已经开发了各种技术来扩大具有大型数据集的大型 DNN 的训练,包括模型并行性、数据并行性和混合并行性,以及硬件、软件和算法的优化。

在本文中我们将演示使用 PyTorch 的数据并行性和模型并行性。

我们所说的并行性一般是指在多个gpu,或多台机器上训练深度神经网络(dnn),以实现更少的训练时间。数据并行背后的基本思想是将训练数据分成更小的块,让每个GPU或机器处理一个单独的数据块。然后将每个节点的结果组合起来,用于更新模型参数。在数据并行中,模型体系结构在每个节点上是相同的,但模型参数在节点之间进行了分区。每个节点使用分配的数据块训练自己的本地模型,在每次训练迭代结束时,模型参数在所有节点之间同步。这个过程不断重复,直到模型收敛到一个令人满意的结果。

下面我们用用ResNet50和CIFAR10数据集来进行完整的代码示例:

在数据并行中,模型架构在每个节点上保持相同,但模型参数在节点之间进行了分区,每个节点使用分配的数据块训练自己的本地模型。

PyTorch的DistributedDataParallel 库可以进行跨节点的梯度和模型参数的高效通信和同步,实现分布式训练。本文提供了如何使用ResNet50和CIFAR10数据集使用PyTorch实现数据并行的示例,其中代码在多个gpu或机器上运行,每台机器处理训练数据的一个子集。训练过程使用PyTorch的DistributedDataParallel 库进行并行化。

导入必须要的库

import os
from datetime import datetime
from time import time
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel

接下来,我们将检查GPU

import subprocess
result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
print(result.stdout.decode())

因为我们需要在多个服务器上运行,所以手动一个一个执行并不现实,所以需要有一个调度程序。这里我们使用SLURM文件来运行代码(slurm面向Linux和Unix类似内核的免费和开源工作调度程序),

def main():
# get distributed configuration from Slurm environment
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--batch-size', default=128, type =int,
help='batch size. it will be divided in mini-batch for each worker')
parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('-c','--checkpoint', default=None, type=str,
help='path to checkpoint to load')
args = parser.parse_args()
rank = int(os.environ['SLURM_PROCID'])
local_rank = int(os.environ['SLURM_LOCALID'])
size = int(os.environ['SLURM_NTASKS'])
master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
port = "29500"
node_id = os.environ['SLURM_NODEID']
ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
train(args, ddp_arg)

然后我们使用DistributedDataParallel 库来执行分布式训练。

def train(args, ddp_arg):
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
# display info
if rank == 0:
#print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
#print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
# configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
#dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
dist.init_process_group(
backend='nccl',
init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
world_size=size,
rank=rank
)
# distribute model
torch.cuda.set_device(local_rank)
gpu = torch.device("cuda")
#model = ResNet18(classes=10).to(gpu)
model = torchvision.models.resnet50(pretrained=False).to(gpu)
ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
if args.checkpoint is not None:
map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
# distribute batch size (mini-batch)
batch_size = args.batch_size
batch_size_per_gpu = batch_size // size
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# load data with distributed sampler
#train_dataset = torchvision.datasets.CIFAR10(root='./data',
# train=True,
# transform=transform_train,
# download=False)
# load data with distributed sampler
train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
num_replicas=size,
rank=rank)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
# training (timers and display handled by process 0)
if rank == 0: start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
if rank == 0: start_dataload = time()
for i, (images, labels) in enumerate(train_loader):
# distribution of images and labels to all GPUs
images = images.to(gpu, non_blocking=True)
labels = labels.to(gpu, non_blocking=True)
if rank == 0: stop_dataload = time()
if rank == 0: start_training = time()
# forward pass
outputs = ddp_model(images)
loss = criterion(outputs, labels)
# backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if rank == 0: stop_training = time()
if (i + 1) % 10 == 0 and rank == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
if rank == 0: start_dataload = time()
#Save checkpoint at every end of epoch
if rank == 0:
torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
if rank == 0:
print(">>> Training complete in: " + str(datetime.now() - start))
if __name__ == '__main__':
main()

代码将数据和模型分割到多个gpu上,并以分布式的方式更新模型。下面是代码的一些解释:

train(args, ddp_arg)有两个参数,args和ddp_arg,其中args是传递给脚本的命令行参数,ddp_arg包含分布式训练相关参数。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式训练相关参数。

如果rank为0,则打印当前使用的gpu数量和主节点IP地址信息。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式进程组。

torch.cuda.set_device(local_rank):为这个进程选择指定的GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu):从torchvision模型中加载ResNet50模型,并将其移动到指定的gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):将模型包装在DistributedDataParallel模块中,也就是说这样我们就可以进行分布式训练了

加载CIFAR-10数据集并应用数据增强转换。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):创建一个DistributedSampler对象,将数据集分割到多个gpu上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):创建一个DataLoader对象,数据将批量加载到模型中,这与我们平常训练的步骤是一致的只不过是增加了一个分布式的数据采样DistributedSampler

为指定的epoch数训练模型,以分布式的方式使用optimizer.step()更新权重。

rank0在每个轮次结束时保存一个检查点。

rank0每10个批次显示损失和训练时间。

结束训练时打印训练模型所花费的总时间也是在rank0上。

代码测试

在使用1个节点1/2/3/4个gpu, 2个节点6/8个gpu,每个节点3/4个gpu上进行了训练Cifar10上的Resnet50的测试如下图所示,每次测试的批处理大小保持不变。完成每项测试所花费的时间以秒为单位记录。随着使用的gpu数量的增加,完成测试所需的时间会减少。当使用8个gpu时,需要320秒才能完成,这是记录中最快的时间。这是肯定的,但是我们可以看到训练的速度并没有像GPU数量增长呈现线性的增长,这可能是因为Resnet50算是一个比较小的模型了,并不需要进行并行化训练。

在多个gpu上使用数据并行可以显著减少在给定数据集上训练深度神经网络(DNN)所需的时间。随着gpu数量的增加,完成训练过程所需的时间减少,这表明DNN可以更有效地并行训练。

这种方法在处理大型数据集或复杂的DNN架构时特别有用。通过利用多个gpu,可以加快训练过程,实现更快的模型迭代和实验。但是需要注意的是,通过Data Parallelism实现的性能提升可能会受到通信开销和GPU内存限制等因素的限制,需要仔细调优才能获得最佳结果。

https://avoid.overfit.cn/post/67095b9014cb40888238b84fea17e872

作者:Joseph El Kettaneh

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
生母离世,生父未知,上海81岁外公卖房还债租房照顾2个未成年外籍外孙女,“在我之后,谁来照顾她们?”

生母离世,生父未知,上海81岁外公卖房还债租房照顾2个未成年外籍外孙女,“在我之后,谁来照顾她们?”

大风新闻
2026-03-16 23:07:07
被央媒点名批评!张本智和拒绝说重庆话跟中国球迷互动 数典忘祖

被央媒点名批评!张本智和拒绝说重庆话跟中国球迷互动 数典忘祖

念洲
2026-03-17 20:16:42
提醒:肺癌早期不是咳嗽,而是身上出现这5大异常,不要忽视

提醒:肺癌早期不是咳嗽,而是身上出现这5大异常,不要忽视

袁医生课堂
2026-03-08 09:33:05
租女友回家时,当书记的爸却笑了:丫头啥时候调到市委工作了?

租女友回家时,当书记的爸却笑了:丫头啥时候调到市委工作了?

阿凯销售场
2026-03-16 15:36:40
女子和傻子订了娃娃亲,她嫁给傻子后傻子破产,她炒股赚了几十亿

女子和傻子订了娃娃亲,她嫁给傻子后傻子破产,她炒股赚了几十亿

乔生桂
2026-01-23 16:16:34
千万不要和不是同一消费水平的朋友一起去旅游!去一次就受够了!

千万不要和不是同一消费水平的朋友一起去旅游!去一次就受够了!

深度报
2026-03-11 22:31:05
先开一把,内马尔确定落选巴西队大名单后在社媒晒打CS照片

先开一把,内马尔确定落选巴西队大名单后在社媒晒打CS照片

懂球帝
2026-03-17 10:17:10
女足1-2刚刚遭淘汰,主教练下课!为了中国足球,一分违约金不要

女足1-2刚刚遭淘汰,主教练下课!为了中国足球,一分违约金不要

越岭寻踪
2026-03-18 00:38:24
磋商6小时,特朗普收到坏消息?中国态度强硬,直接给美方定性!

磋商6小时,特朗普收到坏消息?中国态度强硬,直接给美方定性!

达文西看世界
2026-03-17 10:23:35
今年最“烂大街”的5种打扮,件件寒酸廉价又俗气,快看你有吗

今年最“烂大街”的5种打扮,件件寒酸廉价又俗气,快看你有吗

梦仙境
2026-03-05 00:43:51
哈尔滨市发布大风蓝色预警、道路冰雪黄色预警

哈尔滨市发布大风蓝色预警、道路冰雪黄色预警

人民资讯
2026-03-17 16:53:50
伊朗伊斯兰革命卫队:将全力追捕内塔尼亚胡,并将其击毙!瑞士拒绝两架美军机飞越领空,美将驻日两栖远征部队调往中东

伊朗伊斯兰革命卫队:将全力追捕内塔尼亚胡,并将其击毙!瑞士拒绝两架美军机飞越领空,美将驻日两栖远征部队调往中东

每日经济新闻
2026-03-15 15:45:26
能让女人肉体臣服、灵魂沦陷的男人,无非这几种

能让女人肉体臣服、灵魂沦陷的男人,无非这几种

萧狡科普解说
2026-03-17 13:33:37
伊朗炸美国银行,火光堪比9·11,不给美国喘息机会

伊朗炸美国银行,火光堪比9·11,不给美国喘息机会

胡同里有只猫A
2026-03-17 12:03:32
广东111-89战胜新疆 球员评价:7人优秀,3人及格,2人低迷

广东111-89战胜新疆 球员评价:7人优秀,3人及格,2人低迷

篮球资讯达人
2026-03-18 01:01:14
强援降临!伊朗采购36架歼-10,沙特60亿抢货,美媒:巴铁是中间人

强援降临!伊朗采购36架歼-10,沙特60亿抢货,美媒:巴铁是中间人

科技虎虎
2026-03-17 15:13:36
油价调整时间明确!这次涨价有多猛?预计涨幅超上次!

油价调整时间明确!这次涨价有多猛?预计涨幅超上次!

新浪财经
2026-03-17 15:08:42
1958年,张国焘请求中央给予他补助,毛主席同意,但提出一个条件

1958年,张国焘请求中央给予他补助,毛主席同意,但提出一个条件

帝哥说史
2026-01-17 06:40:03
资本正从迪拜逃往其他金融中心,比如香港

资本正从迪拜逃往其他金融中心,比如香港

碳基生物关怀组织
2026-03-17 22:55:15
哈佛研究发现:便秘超过3天或更久,肠道有益菌下降,大脑比同龄人老3年

哈佛研究发现:便秘超过3天或更久,肠道有益菌下降,大脑比同龄人老3年

肠菌科普
2026-03-14 07:02:43
2026-03-18 04:44:49
deephub incentive-icons
deephub
CV NLP和数据挖掘知识
1951文章数 1458关注度
往期回顾 全部

科技要闻

3万字实录|黄仁勋:每家公司都必须懂养虾

头条要闻

特朗普:伊朗2名高级官员被清除 1人为最高层人物

头条要闻

特朗普:伊朗2名高级官员被清除 1人为最高层人物

体育要闻

那个男人34岁拒绝买断 他要给状元当导师

娱乐要闻

姚晨曹郁官宣离婚,评论区全是冷嘲热讽

财经要闻

多款药品被曝线上线下价差巨大

汽车要闻

10分钟电量20%→97% 低温实测比亚迪闪充

态度原创

健康
亲子
旅游
公开课
军事航空

转头就晕的耳石症,能开车上班吗?

亲子要闻

这些细节让你立刻识别“妈味”女人!真的可以看得出来?

旅游要闻

@广大游客,这份昆明春日“赏花、食花、购花”消费提示请查收

公开课

李玫瑾:为什么性格比能力更重要?

军事要闻

拉里贾尼生死罗生门:以军称其身亡 他本人社媒账号发信

无障碍浏览 进入关怀版