Distributed DataLoader For Pytorch Based On Ray

Overview

Dpex——用户无感知分布式数据预处理组件

一、前言

随着GPU与CPU的算力差距越来越大以及模型训练时的预处理Pipeline变得越来越复杂,CPU部分的数据预处理已经逐渐成为了模型训练的瓶颈所在,这导致单机的GPU配置的提升并不能带来期望的线性加速。预处理性能瓶颈的本质在于每个GPU能够使用的CPU算力受限, 为了解决这个问题NVIDIA提出了scale up的方案——GPU数据预处理库DALI,Tensorflow给出了scale out的方案——分布式数据预处理组件DataService,而在这里我们给出Pytorch生态中的scale out方案——分布式数据预处理组件Dpex。

二、架构介绍(介绍Pytorch DataLoader本身的架构以及DistDataLoader的架构)

Dpex的采用了和Pytorch的DataLoader同样的架构设计并借助Ray将数据预处理任务调度至其他机器节点进行计算。

三、使用示例

不仅在设计上,Dpex的实现上也完全兼容Pytorch的DataLoader。当并行数据预处理时,若设置distribute_modeTrueDpexDataLoader使用 _RayDataLoaderIter实现分布式数据预处理,当设置为FalseDpexDataLoader退回到使用Pytorch本身的_MultiProcessingDataLoaderIter 实现并行数据预处理与加载。在Pytorch训练中使用Dpex非常的简单,只需要将使用到Pytorch的DataLoader的地方替换为Dpex中的DpexDataLoader即可,当你的训练机器本身为Ray集群中的一个节点时,设置 distribute_mode=True可以启用分布式数据预处理。在下面我们给出单卡训练,使用DataParallel进行多卡训练以及使用DDP进行多卡训练时使用Dpex的示例,具体可参考测试文件。
class DpexDataLoader(torch.utils.data.DataLoader): def init(self, dataset: Dataset[T_co], distribute_mode: Optional[bool] = False, head_address="auto", batch_size: Optional[int] = 1, shuffle: bool = False, sampler: Optional[Sampler[int]] = None, batch_sampler: Optional[Sampler[Sequence[int]]] = None, num_workers: int = 0, collate_fn: Optional[_collate_fn_t] = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Optional[_worker_init_fn_t] = None, multiprocessing_context=None, generator=None, *, prefetch_factor: int = 2):

3.1 单卡训练

如下我们给出单卡训练时使用DpexDataLoader的示例代码,具体代码细节参见测试代码文件.

from torchvision import datasets
from torchvision.transforms import ToTensor
from Dpex import dataloader

training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)
# use DpexDataLoader
train_loader = dataloader.DpexDataLoader(training_data, distribute_mode=True, num_workers=10, batch_size=100, shuffle=True)

for epoch in range(3):
    for index, (image, label) in enumerate(train_loader):
       # your train process
       pass

3.2 基于DataParallel的多卡训练

如下我们给出使用DataParallel并行训练时使用DpexDataLoader的示例代码,具体代码细节参见测试代码文件.

import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset
from Dpex import dataloader

class MyOwnDataset(Dataset):
     pass
     
# use DpexDataLoader
data_loader = dataloader.DpexDataLoader(dataset=RandomDataset(input_size, data_size),
                                        distribute_mode=True, batch_size=batch_size, shuffle=True, num_workers=10)

class Model(nn.Module):
    pass
    
model = Model()

if torch.cuda.is_available():
    model.cuda()

if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

for data in data_loader:
   # train your own model
   pass

3.3 基于DDP的多卡训练

如下我们给出使用DDP并行训练时使用DpexDataLoader的示例代码,具体代码细节参见测试代码文件.

import torch
import torch.nn as nn
from torch.utils.data import Dataset
from Dpex.dataloader import DpexDataLoader
from torch.utils.data.distributed import DistributedSampler

# start command: CUDA_VISIBLE_DEVICES=1,6,7 python -m torch.distributed.launch --nproc_per_node=2 pytorch_ddp.py
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")

input_size = 5
output_size = 2
batch_size = 1
data_size = 90000

# 2) 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        self.data = torch.randn(length, size)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return self.len

dataset = RandomDataset(input_size, data_size)
# 3)使用DistributedSampler
rand_loader = DpexDataLoader(dataset=dataset, distribute_mode=True, batch_size=batch_size, sampler=DistributedSampler(dataset), num_workers=10)

class Model(nn.Module):
    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("  In Model: input size", input.size(),
              "output size", output.size())
        return output

model = Model(input_size, output_size)

# 4) 封装之前要把模型移到对应的gpu
model.to(device)

if torch.cuda.device_count() > 1:
    print("Let's use", torch.cuda.device_count(), "GPUs!")
    # 5) 封装
    model = torch.nn.parallel.DistributedDataParallel(model,
                                                      device_ids=[local_rank],
                                                      output_device=local_rank)

for data in rand_loader:
    if torch.cuda.is_available():
        input_var = data
    else:
        input_var = data

    output = model(input_var)
    print("Outside: input size", input_var.size(), "output_size", output.size())

四、Benchmark

在接下来的Benchamark中我们核心展示两个部分的内容:

  • DpexDataLoader对于模型训练精度的影响
  • DpexDataLoader对于模型训练速度的影响

Dpex只是将单机数据预处理水平扩展到了多机以借助更多的算力来加速数据预处理而不改变数据本身的加载和与处理方式,所以本身对模型的精度不会有负面影响。对于数据预处理较重的情况

4.1 模型精度Benchmark

我们在FashionMNIST数据集上进行模型训练精度的Benchmark实验,具体代码细节见测试文件

Accuracy(%) Loss GPU Settings DpexDataLoader(distributed_mode=?) Epoch Learning rate Batch size
90.65 0.137 Single GPU True 40 0.001 100
91.09 0.112 Single GPU False 40 0.001 100
90.67 0.016 DataParallel True 40 0.001 100
90.32 0.008 DataParallel False 40 0.001 100
88.98 0.034 DDP True 40 0.001 100
89.84 0.030 DDP False 40 0.001 100

4.2 训练速度Benchmark

五、环境依赖:

Dpex借助Ray完成任务的跨机调度,所以若希望使用分布式数据预处理首先需要将你的训练机器构建成Ray的集群。Ray的集群构建细节具体参考Ray的相关文档

You might also like...
A fast python implementation of Ray Tracing in One Weekend using python and Taichi
A fast python implementation of Ray Tracing in One Weekend using python and Taichi

ray-tracing-one-weekend-taichi A fast python implementation of Ray Tracing in One Weekend using python and Taichi. Taichi is a simple "Domain specific

TorchXRayVision: A library of chest X-ray datasets and models.
TorchXRayVision: A library of chest X-ray datasets and models.

torchxrayvision A library for chest X-ray datasets and models. Including pre-trained models. ( 🎬 promo video about the project) Motivation: While the

LF-YOLO (Lighter and Faster YOLO) is used to detect defect of X-ray weld image.
LF-YOLO (Lighter and Faster YOLO) is used to detect defect of X-ray weld image.

This project is based on ultralytics/yolov3. LF-YOLO (Lighter and Faster YOLO) is used to detect defect of X-ray weld image. Download $ git clone http

some classic model used to segment the medical images like CT、X-ray and so on

github_project This is a project for medical image segmentation. This project includes common medical image segmentation models such as U-net, FCN, De

A simple rest api that classifies pneumonia infection weather it is Normal, Pneumonia Virus or Pneumonia Bacteria from a chest-x-ray image.
A simple rest api that classifies pneumonia infection weather it is Normal, Pneumonia Virus or Pneumonia Bacteria from a chest-x-ray image.

This is a simple rest api that classifies pneumonia infection weather it is Normal, Pneumonia Virus or Pneumonia Bacteria from a chest-x-ray image.

Ray tracing of a Schwarzschild black hole written entirely in TensorFlow.

TensorGeodesic Ray tracing of a Schwarzschild black hole written entirely in TensorFlow. Dependencies: Python 3 TensorFlow 2.x numpy matplotlib About

A fast, distributed, high performance gradient boosting (GBT, GBDT, GBRT, GBM or MART) framework based on decision tree algorithms, used for ranking, classification and many other machine learning tasks.

Light Gradient Boosting Machine LightGBM is a gradient boosting framework that uses tree based learning algorithms. It is designed to be distributed a

Pytorch implementation of Distributed Proximal Policy Optimization: https://arxiv.org/abs/1707.02286
Pytorch implementation of Distributed Proximal Policy Optimization: https://arxiv.org/abs/1707.02286

Pytorch-DPPO Pytorch implementation of Distributed Proximal Policy Optimization: https://arxiv.org/abs/1707.02286 Using PPO with clip loss (from https

The pure and clear PyTorch Distributed Training Framework.
The pure and clear PyTorch Distributed Training Framework.

The pure and clear PyTorch Distributed Training Framework. Introduction Requirements and Usage Dependency Dataset Basic Usage Slurm Cluster Usage Base

Comments
  • worker_loop看起来是读取了本地的数据,然后发送给了remote node。导致效率不高

    worker_loop看起来是读取了本地的数据,然后发送给了remote node。导致效率不高

    测试情景

    使用fashion_mnist_train_test.py。 FashionMNIST预先下载到本地(download=False),remote node没有下载,但是程序依然可以运行。

    猜测: worker_loop读取了本地训练数据,然后发送给了remote node? 对ray不是很熟悉,只是猜测。如果真是这样,这样的效率其实很低。 更需要的模式是:

    1. remote node 读取其本地(或者数据库)数据,预处理好,发回driver。
    opened by umialpha 2
  • tests的代码如何跑起来?

    tests的代码如何跑起来?

    我打了同样镜像的image在k8s没有办法运行起来。

    错误:

    raceback (most recent call last):
      File "tests/fashion_mnist_train_test.py", line 85, in <module>
        for images, labels in train_loader:
      File "/home/ray/anaconda3/lib/python3.7/site-packages/torch/utils/data/dataloader.py", line 354, in __iter__
        self._iterator = self._get_iterator()
      File "/home/ray/anaconda3/lib/python3.7/site-packages/Dpex/dataloader.py", line 37, in _get_iterator
        return _RayDataLoaderIter(self)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/Dpex/dataloader.py", line 67, in __init__
        self._drop_last, self._base_seed, self._worker_init_fn, i)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/remote_function.py", line 123, in _remote_proxy
        return self._remote(args=args, kwargs=kwargs)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 293, in _invocation_remote_span
        return method(self, args, kwargs, *_args, **_kwargs)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/remote_function.py", line 233, in _remote
        name=name)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 145, in client_mode_convert_function
        return client_func._remote(in_args, in_kwargs, **kwargs)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/common.py", line 130, in _remote
        return self.options(**option_args).remote(*args, **kwargs)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/common.py", line 380, in remote
        return return_refs(ray.call_remote(self, *args, **kwargs))
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/api.py", line 106, in call_remote
        return self.worker.call_remote(instance, *args, **kwargs)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/worker.py", line 452, in call_remote
        pb_arg = convert_to_arg(arg, self._client_id)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/client_pickler.py", line 174, in convert_to_arg
        out.data = dumps_from_client(val, client_id)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/client_pickler.py", line 154, in dumps_from_client
        cp.dump(obj)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 580, in dump
        return Pickler.dump(self, obj)
      File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/client_pickler.py", line 86, in persistent_id
        ref_id=obj._actor_id.id,
      File "python/ray/includes/unique_ids.pxi", line 369, in ray._raylet.ClientActorRef.id.__get__
      File "python/ray/includes/unique_ids.pxi", line 348, in ray._raylet.ClientActorRef.binary
      File "python/ray/includes/unique_ids.pxi", line 378, in ray._raylet.ClientActorRef._wait_for_id
      File "python/ray/includes/unique_ids.pxi", line 380, in ray._raylet.ClientActorRef._wait_for_id
      File "/home/ray/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 428, in result
        return self.__get_result()
      File "/home/ray/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
        raise self._exception
    
    opened by umialpha 1
Owner
Dalong
I am now a master student in Tsinghua University and i major in software engeneering. Currently i am foucing on high performance graph learning system.
Dalong
Dataloader tools for language modelling

Installation: pip install lm_dataloader Design Philosophy A library to unify lm dataloading at large scale Simple interface, any tokenizer can be inte

null 5 Mar 25, 2022
Neurons Dataset API - The official dataloader and visualization tools for Neurons Datasets.

Neurons Dataset API - The official dataloader and visualization tools for Neurons Datasets. Introduction We propose our dataloader API for loading and

null 1 Nov 19, 2021
Pytorch Lightning Distributed Accelerators using Ray

Distributed PyTorch Lightning Training on Ray This library adds new PyTorch Lightning accelerators for distributed training using the Ray distributed

null 166 Dec 27, 2022
Pytorch Lightning Distributed Accelerators using Ray

Distributed PyTorch Lightning Training on Ray This library adds new PyTorch Lightning plugins for distributed training using the Ray distributed compu

null 167 Jan 2, 2023
Implementation of light baking system for ray tracing based on Activision's UberBake

Vulkan Light Bakary MSU Graphics Group Student's Diploma Project Treefonov Andrey [GitHub] [LinkedIn] Project Goal The goal of the project is to imple

Andrey Treefonov 7 Dec 27, 2022
Medical-Image-Triage-and-Classification-System-Based-on-COVID-19-CT-and-X-ray-Scan-Dataset

Medical-Image-Triage-and-Classification-System-Based-on-COVID-19-CT-and-X-ray-Sc

null 2 Dec 26, 2021
Vertical Federated Principal Component Analysis and Its Kernel Extension on Feature-wise Distributed Data based on Pytorch Framework

VFedPCA+VFedAKPCA This is the official source code for the Paper: Vertical Federated Principal Component Analysis and Its Kernel Extension on Feature-

John 9 Sep 18, 2022
A PyTorch Extension: Tools for easy mixed precision and distributed training in Pytorch

This repository holds NVIDIA-maintained utilities to streamline mixed precision and distributed training in Pytorch. Some of the code here will be included in upstream Pytorch eventually. The intention of Apex is to make up-to-date utilities available to users as quickly as possible.

NVIDIA Corporation 6.9k Jan 3, 2023
A PyTorch Extension: Tools for easy mixed precision and distributed training in Pytorch

Introduction This is a Python package available on PyPI for NVIDIA-maintained utilities to streamline mixed precision and distributed training in Pyto

Artit 'Art' Wangperawong 5 Sep 29, 2021
Rayvens makes it possible for data scientists to access hundreds of data services within Ray with little effort.

Rayvens augments Ray with events. With Rayvens, Ray applications can subscribe to event streams, process and produce events. Rayvens leverages Apache

CodeFlare 32 Dec 25, 2022