Distributed training with PyTorch

Oleg Boiko
5 min readNov 21, 2020

--

Photo by Matt Seymour on Unsplash

In this tutorial, you will learn practical aspects of how to parallelize ML model training across multiple GPUs on a single node. You will also learn the basics of PyTorch’s Distributed Data Parallel framework.

If you are eager to see the code, here is an example of how to use DDP to train MNIST classifier. You can contrast it with the code for the same model trained on a single CPU device.

What is DDP?

DDP is a library in PyTorch which enables synchronization of gradients across multiple devices. What does it mean? It means that you can speed up model training almost linearly by parallelizing it across multiple GPUs. In other words, it will take roughly half the time to train the model on a machine with 2 GPUs vs machine with a single GPU. Many cloud providers, such as AWS and GCP offer multi-GPU machines. For example, ml.p2.8xlarge instances from AWS feature 8 GPUs. It’s not *strictly* linear because there is a small performance overhead associated with gathering tensors from multiple devices.

DDP supports training distributed across multiple separate hosts. Such setup is out of scope for this tutorial.

How does it work?

DDP works by creating a separate Python process for each GPU. Each process is using a non-overlapping subset of the data. Note, at most one DDP process can run on one GPU. That said, it’s possible to have fewer DDP processes than GPUs available on the machine. In this case, some GPUs will remain unused. It is not possible to have more processes than GPUs for a single script.

PyTorch offers tools to spawn multiple processes, as well as to split a dataset into non-overlapping subsets.

If you’re interested to learn more details about the implementation of DDP, feel free to explore DDP design documentation.

Terminology

Before we start, let’s get familiar with a few DDP concepts:

  • world size — number of GPU devices on which training is happening
  • rank — sequential id of a single GPU device. For example, 0

DDP makes rank available to your script as a command line argument. world_size can be obtained via torch.cuda.device_count(), assuming you’d like to utilize all available GPUs.

Prepare the dataset

Let’s start with learning how to use PyTorch to split a dataset into non-overlapping chunks. First, let’s take a look at how to create simple, non-distributed dataset:

# Download and initialize MNIST train dataset
train_dataset = datasets.MNIST('./mnist_data',
download=True,
train=True)
# Wrap train dataset into DataLoader
train_loader = DataLoader(train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=4,
pin_memory=True)

In order to create a distributed data loader, use torch.utils.data.DistributedSampler like this:

# Download and initialize MNIST train dataset
train_dataset = datasets.MNIST('./mnist_data',
download=True,
train=True,
transform=transform)
# Create distributed sampler pinned to rank
sampler = DistributedSampler(train_dataset,
num_replicas=world_size,
rank=rank,
shuffle=True, # May be True
seed=42)
# Wrap train dataset into DataLoader
train_loader = DataLoader(train_dataset,
batch_size=batch_size,
shuffle=False, # Must be False!
num_workers=4,
sampler=sampler,
pin_memory=True)

We create DisstributedSampler and pass it into DataLoader. It’s crucial to set shuffle=False on DataLoader to avoid messing up the subsets. Shuffling is done by the Sampler, so you may want to set shuffle=True there.

Prepare the model

Here is an example of how model can be initialized in single device environment:

def create_model():
model = nn.Sequential(
nn.Linear(28*28, 128), # MNIST images are 28x28 pixels
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, 10, bias=False) # 10 classes to predict
)
return model
# Initialize the model
model = create_model()

In order to make it work in multi-GPU environment, the following modifications are required:

# Initialize the model
model = create_model()
# Create CUDA device
device = torch.device(f'cuda:{rank}')
# Send model parameters to the device
model = model.to(device)
# Wrap the model in DDP wrapper
model = DistributedDataParallel(model, device_ids=[rank], output_device=rank)

Training cycle

Single-device training cycle will have code similar to this:

for i in range(epochs):
for x, y in train_loader:
# do the training
...

In multi-GPU environment, sampler has to know which epoch is this:

for i in range(epochs):
train_loader.sampler.set_epoch(i)
for x, y in train_loader:
# do the training
...

Getting rank from command line arguments

DDP will pass --local-rank parameter to your script. You can parse it like this:

parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()
rank = args.local_rank

Saving the model once

DDP maintains a carbon copy of the model’s parameters on each GPU, so you should only save your model once:

if rank == 0:
torch.save(model.module.state_dict(), 'model.pt')

Launching the script

DDP offers a launching utility, which you can use to spawn multiple processes. If your machine has 4 GPUs available, a command line will look something like this:

python -m torch.distributed.launch --nproc_per_node=4 ddp_tutorial_multi_gpu.py

Note, that --local-rank is not explicitly specified here. It will be added to the command line automatically by PyTorch before calling your script.

Making sure it works

There are several simple indicators which can tell you that your implementation is correct:

  1. Print the length of your data loader. In this example, it would be len(train_loader) . For example, your dataset has 10,000 examples, and batch size is 100. That means that the data loader will have 10,000/100=1,000 batches total. This will be the length of the data loader when only one device is used. However, with DDP this number will be divided by the number of devices. So, if you are using the same dataset and settings, but training on 2 GPUs, the length of the data loader will be 1,000/2=500.
  2. Expect to see logs from each process separately. If you’re logging into a file, it might be a good idea to either add rank to the log message or include rank in the log file name.
  3. Use nvidia-smi command line tool to monitor GPU usage. If your implementation is correct, you should see all GPUs more or less equally utilized

Conclusion

In this tutorial, you learned what Distributed Data Parallel is, and the main components of the training process which uses DDP: distributed sampler for the dataset, DDP model wrapper, launching utility.

Feel free to explore the code and provide feedback: https://github.com/olehb/pytorch_ddp_tutorial/blob/main/ddp_tutorial_multi_gpu.py

--

--

Oleg Boiko
Oleg Boiko

Written by Oleg Boiko

Machine Learning Engineer @ Chegg

Responses (1)