Problems with hanging process at the end when using dataloaders on each process

I am trying to get accelerate working on a video task and I am running into problems with processes getting stuck.

Here’s a brief summary of my problem: I have multiple directories containing multiple (up to a thousand) image frames. Because loading all images for a batch of videos at once is not possible due to memory constraints, I am trying to iteratively encode a batch of videos using a resnet and feed the cached embeddings to a sequence model. I want to fine-tune the encoder as well and for that reason precomputing the embeddings is not possible.

My thinking goes like this:

  1. Get a list of paths to all video directories.
  2. Distribute subsets of the paths evenly among all available GPUs.
  3. Within each GPU we then sequentially loop over the subset of paths and:
    3.1 For each path to a video directory create a dataset and -loader
    3.2 and iteratively encode batches of this loader with a partially frozen resnet and store results in a cache
    3.3 Finally, we aggregate the caches for a given batch, pad all image sequences to same length and feed the resulting batch to a sequence model.

Here’s the code that I use:

import torch
from torchvision.models import resnet18
from accelerate import Accelerator
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from torchvision import transforms
from tqdm import tqdm
from PIL import Image

def chunker(seq, size):
    """chunk given sequence into batches of given size"""
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

class ImageDataset(Dataset):
    def __init__(self, img_dir):
        super().__init__()
        self.img_paths = list(img_dir.rglob('*.jpg'))

        self.transform = transforms.Compose([
            transforms.Resize(224),
            transforms.ToTensor(),
            transforms.Normalize(
                (0.485, 0.456, 0.406), 
                (0.229, 0.224, 0.225)),
        ])

    def __getitem__(self, idx):
        path = self.img_paths[idx]
        img = self.transform(Image.open(path))
        return img
        
    def __len__(self):
        return len(self.img_paths)


def main():

    torch.multiprocessing.set_sharing_strategy('file_system') # resolves too many files open error
    
    accelerator = Accelerator(fp16=True, device_placement=False)

    # partially freeze network
    model = resnet18(pretrained=False)
    for param in model.parameters():
        param.requires_grad = False
    model.fc = torch.nn.Linear(model.fc.in_features, 256) 
    
    model.to(accelerator.device)
    model = accelerator.prepare_model(model)

    # 1. Get a list of paths to all image directories.
    data_path = Path('/path/to/data_root/')
    img_dirs = list(data_path.glob('*')) 
    
     # 2. distribute subsets of the paths evenly among all available GPUs
    n_dirs = len(img_dirs)
    split_size = n_dirs // accelerator.num_processes
    img_dirs = img_dirs[accelerator.process_index*split_size : (accelerator.process_index+1)*split_size]
    
    # just use single image bag for testing, in practise we would loop over these items
    img_dir_batch = list(chunker(img_dirs, 1))[0] 

    states = [] # container to collect outputs
    for img_dir in img_dir_batch:

        # 3.1 create dataset and loader for current video
        ds = ImageDataset(img_dir)
        dl = DataLoader(ds, batch_size=16, num_workers=1)
        
        # 3.2 iteratively encode and cache frames
        outs = []
        progress = tqdm(dl, disable=not accelerator.is_local_main_process)
        for img in progress:
            torch.cuda.empty_cache() # free memory
            out = model(img.to(accelerator.device))
            outs.append(out)
        outs = torch.cat(outs,dim=0)
        states.append(outs)

    # 3.3 aggregate batch containing multiple videos
    # here we would then zero pad `states` into a single batch such that sequences have same length
    # next we feed the batch into another model

if __name__=="__main__":
    main()
    print('done.')

This seems to work and the first (out of two) processes finishes fine (i.e. reaching the print statement at the end). The second process however completes the encoding stage and then just hangs indefinitely at the end.

I am using a single machine with 2 GPUs.

Thank you all, any help is appreciated :slight_smile:

It seems to work only when I wrap the forward call in with model.no_sync() and set the number of workers to 0. This makes the loading very slow however. And when I try to use a config with Deepspeed it throws an error that the model object has no attribute no_sync.

It looks like it might be a gradient sync problem, but you’re not using any gradients. Would putting the model call in a torch.no_grad() help?

Thanks for the quick reply. I tried it now but unfortunately the problem persists when using torch.no_grad(). Wrapping the model call in no_grad() only works when I set the number of workers in the dataloader to 0. If the number of workers is greater than 0 the process hangs again.

That is weird but it looks like an issue in PyTorch multiprocessing then: setting the num_workers to 0 means they are not creating a new process.

Do you have the issue if you use classic PyTorch DDP or just Accelerate?