Easy Parallel Library (EPL) is a general and efficient deep learning framework for distributed model training.

Overview

pypi docs License

English | 简体中文

Easy Parallel Library

Overview

Easy Parallel Library (EPL) is a general and efficient library for distributed model training.

  • Usability - Users can implement different parallelism strategies with a few lines of annotations, including data parallelism, pipeline parallelism, tensor model parallelism, and their hybrids.
  • Memory Efficient - EPL provides various memory-saving techniques, including gradient checkpoint, ZERO, CPU Offload, etc. Users are able to train larger models with fewer computing resources.
  • High Performance - EPL provides an optimized communication library to achieve high scalability and efficiency.

For more information, you may read the docs.

EPL Model Zoo provides end-to-end parallel training examples.

Installation

To install EPL, please refer to the following instructions.

Examples

Here are a few examples of different parallelism strategies by changing only annotations. Please refer to API documentation for API details and tutorials for more examples.

Data Parallelism

The following example shows a basic data parallelism annotation. The data parallelism degree is determined by the allocated GPU number.

+ import epl
+ epl.init()
+ with epl.replicate(device_count=1):
    model()

Pipeline Parallelism

The following example shows pipeline parallelism with two pipeline stages, each stage is computed with one GPU. If the total GPU number is 4, EPL will automatically apply two-degree data parallelism over the model pipeline.

+ import epl
+ 
+ config = epl.Config({"pipeline.num_micro_batch": 4})
+ epl.init(config)
+ with epl.replicate(device_count=1, name="stage_0"):
    model_part1()
+ with epl.replicate(device_count=1, name="stage_1"):
    model_part2()

Tensor Model Parallelism

The following example shows a tensor model parallelism annotation. We apply data parallelism to the ResNet part, and apply tensor model parallelism to classification part.

+ import epl
+ config = epl.Config({"cluster.colocate_split_and_replicate": True})
+ epl.init(config)
+ with epl.replicate(8):
    ResNet()
+ with epl.split(8):
    classification()

Publication

If you use EPL in your publication, please cite it by using the following BibTeX entry.

@misc{jia2021whale,
      title={Whale: Scaling Deep Learning Model Training to the Trillions}, 
      author={Xianyan Jia and Le Jiang and Ang Wang and Jie Zhang and Xinyuan Li and Wencong Xiao and Langshi chen and Yong Li and Zhen Zheng and Xiaoyong Liu and Wei Lin},
      year={2021},
      eprint={2011.09208},
      archivePrefix={arXiv},
      primaryClass={cs.DC}
}

Contact Us

Join the Official Discussion Group on DingTalk.

DingTalk Group

You might also like...
This repository provides an efficient PyTorch-based library for training deep models.

An Efficient Library for Training Deep Models This repository provides an efficient PyTorch-based library for training deep models. Installation Make

A parallel framework for population-based multi-agent reinforcement learning.
A parallel framework for population-based multi-agent reinforcement learning.

MALib: A parallel framework for population-based multi-agent reinforcement learning MALib is a parallel framework of population-based learning nested

Ultra-Data-Efficient GAN Training: Drawing A Lottery Ticket First, Then Training It Toughly
Ultra-Data-Efficient GAN Training: Drawing A Lottery Ticket First, Then Training It Toughly

Ultra-Data-Efficient GAN Training: Drawing A Lottery Ticket First, Then Training It Toughly Code for this paper Ultra-Data-Efficient GAN Tra

Official implementation of "SinIR: Efficient General Image Manipulation with Single Image Reconstruction" (ICML 2021)

SinIR (Official Implementation) Requirements To install requirements: pip install -r requirements.txt We used Python 3.7.4 and f-strings which are in

A general-purpose, flexible, and easy-to-use simulator alongside an OpenAI Gym trading environment for MetaTrader 5 trading platform (Approved by OpenAI Gym)
A general-purpose, flexible, and easy-to-use simulator alongside an OpenAI Gym trading environment for MetaTrader 5 trading platform (Approved by OpenAI Gym)

gym-mtsim: OpenAI Gym - MetaTrader 5 Simulator MtSim is a simulator for the MetaTrader 5 trading platform alongside an OpenAI Gym environment for rein

A multi-functional library for full-stack Deep Learning. Simplifies Model Building, API development, and Model Deployment.
A multi-functional library for full-stack Deep Learning. Simplifies Model Building, API development, and Model Deployment.

chitra What is chitra? chitra (चित्र) is a multi-functional library for full-stack Deep Learning. It simplifies Model Building, API development, and M

Model parallel transformers in Jax and Haiku

Mesh Transformer Jax A haiku library using the new(ly documented) xmap operator in Jax for model parallelism of transformers. See enwik8_example.py fo

Code for the ICML 2021 paper
Code for the ICML 2021 paper "Bridging Multi-Task Learning and Meta-Learning: Towards Efficient Training and Effective Adaptation", Haoxiang Wang, Han Zhao, Bo Li.

Bridging Multi-Task Learning and Meta-Learning Code for the ICML 2021 paper "Bridging Multi-Task Learning and Meta-Learning: Towards Efficient Trainin

A Lighting Pytorch Framework for Recommendation System, Easy-to-use and Easy-to-extend.

Torch-RecHub A Lighting Pytorch Framework for Recommendation Models, Easy-to-use and Easy-to-extend. 安装 pip install torch-rechub 主要特性 scikit-learn风格易用

Comments
  • Gradient Checkpoint with auto type got a TypeError

    Gradient Checkpoint with auto type got a TypeError

    After I added below codes to my worked functions, I got a TypeError

    epl_config = epl.Config({
        "gradient_checkpoint.type": "auto",
        "zero.level": "v1",
        "amp.level": "O1", "amp.loss_scale": 128
    })
    epl.init(epl_config)
    epl.set_default_strategy(epl.replicate(1))
    

    error info:

    "/venv/lib/python2.7/site-packages/tensorflow/contrib/graph_editor/util.py", line 214, in get_unique_graph
        t) for t in check_types]), type(op)))
    TypeError: Expected a type in (<class 'tensorflow.python.framework.ops.Tensor'>), got: <class 'tensorflow.python.ops.resource_variable_ops.Resource
    

    my worked functions: (modeling module from robert)

    import epl
    import tensorflow as tf
    from tensorflow.contrib import layers, metrics
    
    epl_config = epl.Config({
        "gradient_checkpoint.type": "auto",
        "zero.level": "v1",
        "amp.level": "O1", "amp.loss_scale": 128
    })
    epl.init(epl_config)
    epl.set_default_strategy(epl.replicate(1))
    bert_path = 'robert_checkpoint_path'
    
    
    def model_fn(features, labels, mode, params):
        is_train_bool = mode == tf.estimator.ModeKeys.TRAIN
    
        # Building BERT model
        bert_config = modeling.BertConfig.from_dict({
            "attention_probs_dropout_prob": 0.1,
            "directionality": "bidi",
            "hidden_act": "gelu",
            "hidden_dropout_prob": 0.1,
            "hidden_size": 768,
            "initializer_range": 0.02,
            "intermediate_size": 3072,
            "max_position_embeddings": 512,
            "num_attention_heads": 12,
            "num_hidden_layers": 6,
            "pooler_fc_size": 768,
            "pooler_num_attention_heads": 12,
            "pooler_num_fc_layers": 3,
            "pooler_size_per_head": 128,
            "pooler_type": "first_token_transform",
            "type_vocab_size": 2,
            "vocab_size": 21128
        })
        bert = modeling.BertModel(
            config=bert_config,
            is_training=is_train_bool,
            input_ids=input_ids,
            input_mask=input_mask,
            token_type_ids=segment_ids,
            use_one_hot_embeddings=False,
            scope='bert'
        )
        # Getting BERT's outputs
        bert_output = bert.get_sequence_output()
        # Loading pre-trained BERT
        if is_train_bool:
            tvars = tf.trainable_variables()
            (
                assignment_map, initialized_names
            ) = modeling.get_assignment_map_from_checkpoint(
                tvars, bert_path
            )
            tf.train.init_from_checkpoint(bert_path, assignment_map)
            tf.logging.info("**** Trainable Variables ****")
            for var in tvars:
                tf.logging.info("  name = {}, shape = {}{}".format(
                    var.name, var.shape,
                    ", *INIT_FROM_CKPT*" if var.name in initialized_names
                    else ''
                ))
    
        with tf.variable_scope("network"):
            # MLP
            first_hidden_layer = tf.layers.dense(
                tf.concat(bert_output, axis=1), 128, activation=tf.nn.relu)
            second_hidden_layer = tf.layers.dense(
                first_hidden_layer, 128, activation=tf.nn.relu)
            logits = tf.layers.dense(second_hidden_layer, 1)
            predictions = tf.sigmoid(logits)
    
        predictions = tf.identity(predictions, name="predict")
    
        if mode == tf.estimator.ModeKeys.PREDICT:
            return tf.estimator.EstimatorSpec(
                mode=mode, predictions={
                    "predict": predictions,
                    'label': features['label'],
                }
            )
        labels = tf.reshape(labels, [-1, 1])
        loss = tf.losses.sigmoid_cross_entropy(labels, logits)
        epl.add_to_collection(loss, epl.GraphKeys.GLOBAL_MEAN_OBJECTS)
    
        optimizer = tf.train.AdamOptimizer()
        train_op = optimizer.minimize(loss=loss,
                                      global_step=tf.train.get_global_step())
        predictions = tf.reshape(predictions, [-1, 1])
        eval_metric_ops = {
            "auc": tf.metrics.auc(labels, predictions),
            "f1": metrics.f1_score(labels, predictions),
            "precision": tf.metrics.precision_at_thresholds(
                labels, predictions, [0.5]
            ),
            "recall": tf.metrics.recall_at_thresholds(
                labels, predictions, [0.5]
            )
        }
    
        return tf.estimator.EstimatorSpec(
            mode=mode,
            loss=loss,
            predictions={"predict": predictions},
            train_op=train_op,
            eval_metric_ops=eval_metric_ops)
    
    opened by RussellZZ 1
  • 训练时,除chief worker外,其余worker在每次save checkpoint 后 step归0,且在第二次save checkpoint 后 整个进程卡死

    训练时,除chief worker外,其余worker在每次save checkpoint 后 step归0,且在第二次save checkpoint 后 整个进程卡死

    代码:

    """Run downstream classification"""
    
    from __future__ import absolute_import
    from __future__ import division
    from __future__ import print_function
    
    import os
    import tensorflow as tf
    import utils.optimizer as optimizer
    import epl
    
    FLAGS = tf.flags.FLAGS
    
    tf.flags.DEFINE_integer("task_index", None, "Worker or server index")
    tf.flags.DEFINE_string("worker_hosts", "", "worker hosts")
    
    tf.flags.DEFINE_string("buckets", "", "tables info")
    tf.flags.DEFINE_string("train_table", "", "tables info")
    tf.flags.DEFINE_string("val_table", "", "tables info")
    tf.flags.DEFINE_string("checkpoint_dir", '',
                           """Path to checkpoint folder""")
    
    tf.flags.DEFINE_integer("num_epochs", 100,
                            """Number of training epochs (default: 20)""")
    tf.flags.DEFINE_integer("max_steps", 10000, "")
    tf.flags.DEFINE_integer("batch_size", 256, """Batch size (default: 64)""")
    tf.flags.DEFINE_integer("display_step", 200,
                            """Number of steps to display log into TensorBoard (default: 20)""")
    tf.flags.DEFINE_integer("save_checkpoints_steps", 1000,
                            "How often to save the model checkpoint.")
    tf.flags.DEFINE_float("learning_rate", 0.001,
                          """Learning rate (default: 0.0005)""")
    tf.flags.DEFINE_float("max_grad_norm", 5.0,
                          """Maximum value of the global norm of the gradients for clipping (default: 5.0)""")
    
    tf.flags.DEFINE_integer("num_pipe_stages", 1, "number of pipeline stages")
    tf.flags.DEFINE_integer("num_micro_batch", 1, "number of pipeline micro batches")
    
    
    def str2list(str_in, shape, separator=' ', dtype=tf.int32):
        data = tf.string_split([str_in], separator)
        data = tf.string_to_number(data.values, dtype)
        return tf.reshape(data, shape)
    
    
    def file_based_input_fn_builder(input_file, slice_id, slice_count, is_training, drop_remainder):
        """Creates an `input_fn` closure to be passed to TPUEstimator."""
        def _decode_record(*record):
            """Decodes a record to a TensorFlow example."""
            (cert_no, coll_case_no, embedding, dt, label) = record
    
            embedding = str2list(embedding, shape=[512], separator='\002', dtype=tf.float32)
    
            example = {'input_embed': embedding,
                       'label': label,
                       'dt': dt,
                       'cert_no': cert_no,
                       'coll_case_no': coll_case_no}
            return example
    
        def input_fn(params):
            """The actual input function."""
            d = tf.data.TableRecordDataset([input_file], record_defaults=['', '', '', '', 0])
            if is_training:
                d = d.repeat(FLAGS.num_epochs)
                d = d.shuffle(buffer_size=1000)
    
            d = d.apply(tf.contrib.data.map_and_batch(
                            lambda v1, v2, v3, v4, v5: _decode_record(v1, v2, v3, v4, v5),
                            batch_size=FLAGS.batch_size,
                            drop_remainder=drop_remainder))
            return d
    
        return input_fn
    
    
    def create_model(input_embed, label):
    
        with tf.variable_scope("loss", reuse=tf.AUTO_REUSE):
            with tf.variable_scope("cls"):            
                logits = tf.layers.dense(
                    input_embed,
                    2,
                    activation=None,
                    kernel_initializer=tf.truncated_normal_initializer())
    
            one_hot_label = tf.one_hot(label, depth=2, dtype=tf.float32)
            loss = tf.losses.softmax_cross_entropy(one_hot_label, logits)
            probs = tf.nn.softmax(logits, axis=-1)
            predict = tf.argmax(probs, axis=-1, output_type=tf.int32)
    
            acc = tf.metrics.accuracy(label, predict)
            auc = tf.metrics.auc(label, probs[:,-1])
            return (loss, acc, auc)
    
    
    def model_fn_builder(checkpoint_dir, learning_rate):
        """Returns `model_fn` closure for TPUEstimator."""
        def model_fn(features, mode):
            """The `model_fn` for Estimator."""
    
            input_embed = features['input_embed']
            label = features["label"]
    
            # create loss
            (loss, acc, auc) = create_model(input_embed, label)
    
            output_spec = None
            if mode == tf.estimator.ModeKeys.TRAIN:
                #rms optimizer
                tvars = tf.trainable_variables()
                grads = tf.gradients(loss, tvars)
                clipped_grads, global_norm = tf.clip_by_global_norm(grads, FLAGS.max_grad_norm)
                tf.summary.scalar('global_grad_norm', global_norm)
    
                global_step = tf.train.get_or_create_global_step()
                optimizer = tf.train.RMSPropOptimizer(learning_rate)
                train_op = optimizer.apply_gradients(zip(clipped_grads, tvars),
                                                name='train_op',
                                                global_step=global_step)
    
                output_spec = tf.estimator.EstimatorSpec(
                    mode=mode,
                    loss=loss,
                    train_op=train_op)
    
            elif mode == tf.estimator.ModeKeys.EVAL:
                output_spec = tf.estimator.EstimatorSpec(
                    mode=mode,
                    loss=loss,
                    eval_metric_ops={'Accuracy':acc, "AUC":auc})
            else:
                raise ValueError("Only TRAIN and EVAL modes are supported: %s" % (mode))
    
            return output_spec
    
        return model_fn
    
    def main(_):
        tf.logging.set_verbosity(tf.logging.INFO)
        tf.logging.info("############## Start #####################")
        checkpoint_dir = os.path.join(FLAGS.buckets, FLAGS.checkpoint_dir)
        train_file = FLAGS.train_table
        val_file = FLAGS.val_table
    
        worker_spec = FLAGS.worker_hosts.split(",")
        worker_count = len(worker_spec)
        task_index = FLAGS.task_index
    
        epl_env = epl.Env.get()
        total_device = len(epl_env.cluster.available_devices)
        num_replica = total_device // FLAGS.num_pipe_stages
        micro_batch = FLAGS.batch_size // epl_env.config.pipeline.num_micro_batch
        micro_batch = micro_batch // num_replica
      
        print("total_batch: {}, num_micro_batch: {}, num_replica: {}, micro_batch: {}".format(
                FLAGS.batch_size,
                epl_env.config.pipeline.num_micro_batch,
                num_replica,
                micro_batch))
        print("task_index:", task_index)
        print("total_device:", total_device)
    
        model_fn = model_fn_builder(checkpoint_dir, FLAGS.learning_rate)
    
        train_input_fn = file_based_input_fn_builder(
            input_file=train_file,
            slice_id=task_index,
            slice_count=worker_count,
            is_training=True,
            drop_remainder=True
        )
    
        val_input_fn = file_based_input_fn_builder(
            input_file=val_file,
            slice_id=task_index,
            slice_count=worker_count,
            is_training=False,
            drop_remainder=False
        )
    
        sess_config = tf.ConfigProto(allow_soft_placement=True)
        config = tf.estimator.RunConfig(session_config=sess_config,
                                        save_checkpoints_steps=FLAGS.save_checkpoints_steps)
    
        estimator = tf.estimator.Estimator(
                    model_fn=model_fn,
                    config=config,
                    model_dir=checkpoint_dir)
    
        train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=FLAGS.max_steps)
        eval_spec = tf.estimator.EvalSpec(input_fn=val_input_fn, start_delay_secs=6, throttle_secs=1)
        tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
        tf.logging.info("#################  All process done.  ########################")
    
    
    if __name__ == '__main__':
        env_dist = os.environ
        print(env_dist.get('TF_CONFIG'))
        config_json = {}
        config_json["pipeline.num_micro_batch"] = FLAGS.num_micro_batch
        epl.init(epl.Config(config_json))
        if FLAGS.num_pipe_stages == 1:
            epl.set_default_strategy(epl.replicate(device_count=1))
        tf.app.run()
    

    训练提交worker sql:

    
    pai -name tensorflow1120_py3
    -Dscript="***/resources/***.tar.gz"
    -DentryFile="train_downstream_cls.py"
    -Dbuckets="***"
    -DuserDefinedParameters="--num_epochs=10 --max_steps=100000 --buckets=*** --checkpoint_dir=*** --train_table=*** --val_table=*** “
    -Dtables="***, ***"
    -Dcluster="{\"worker\":{\"count\":8,\"cpu\":400,\"gpu\":100}}"
    
    opened by walkingwindy 1
  • bug fix

    bug fix

    • Fix ODPS table io_slicing type error.
    • Fix restoring checkpoint in distributed evaluation.
    • Fix hang when enabling amp dynamic loss scale and gradient accumulation.
    opened by SeaOfOcean 0
  • Problem of Data Parallel Model,  program didn't end when reached global step

    Problem of Data Parallel Model, program didn't end when reached global step

    I got a problem when using EPL data parallel Model. The num worker is set to 3 and each worker had its own TF data record input and Model save_dir. The global step is set to 3500 for each worker. It seems normal when global step was below 3500, but the program not end when reached 3500. Seems like the chief worker 0 didn't know other worker was end。

    image

    opened by Jimmy-jin 1
Owner
Alibaba
Alibaba Open Source
Alibaba
PArallel Distributed Deep LEarning: Machine Learning Framework from Industrial Practice (『飞桨』核心框架,深度学习&机器学习高性能单机、分布式训练和跨平台部署)

English | 简体中文 Welcome to the PaddlePaddle GitHub. PaddlePaddle, as the only independent R&D deep learning platform in China, has been officially open

null 19.4k Jan 4, 2023
An efficient and easy-to-use deep learning model compression framework

TinyNeuralNetwork 简体中文 TinyNeuralNetwork is an efficient and easy-to-use deep learning model compression framework, which contains features like neura

Alibaba 441 Dec 25, 2022
Colossal-AI: A Unified Deep Learning System for Large-Scale Parallel Training

ColossalAI An integrated large-scale model training system with efficient parallelization techniques Installation PyPI pip install colossalai Install

HPC-AI Tech 7.1k Jan 3, 2023
Reference implementation of code generation projects from Facebook AI Research. General toolkit to apply machine learning to code, from dataset creation to model training and evaluation. Comes with pretrained models.

This repository is a toolkit to do machine learning for programming languages. It implements tokenization, dataset preprocessing, model training and m

Facebook Research 408 Jan 1, 2023
A PyTorch Extension: Tools for easy mixed precision and distributed training in Pytorch

This repository holds NVIDIA-maintained utilities to streamline mixed precision and distributed training in Pytorch. Some of the code here will be included in upstream Pytorch eventually. The intention of Apex is to make up-to-date utilities available to users as quickly as possible.

NVIDIA Corporation 6.9k Jan 3, 2023
A PyTorch Extension: Tools for easy mixed precision and distributed training in Pytorch

Introduction This is a Python package available on PyPI for NVIDIA-maintained utilities to streamline mixed precision and distributed training in Pyto

Artit 'Art' Wangperawong 5 Sep 29, 2021
Bagua is a flexible and performant distributed training algorithm development framework.

Bagua is a flexible and performant distributed training algorithm development framework.

null 786 Dec 17, 2022
The pure and clear PyTorch Distributed Training Framework.

The pure and clear PyTorch Distributed Training Framework. Introduction Requirements and Usage Dependency Dataset Basic Usage Slurm Cluster Usage Base

WILL LEE 208 Dec 20, 2022
A general framework for deep learning experiments under PyTorch based on pytorch-lightning

torchx Torchx is a general framework for deep learning experiments under PyTorch based on pytorch-lightning. TODO list gan-like training wrapper text

Yingtian Liu 6 Mar 17, 2022