Asynchronous Data Pre-Processing for Multi Modal Models

Hi,

I have to forward pass a lot of images through various VLMs like Paligemma, LLaVA Next, Qwen2-VL.

At first, I was running a simple for loop across images, something like:

for img, prompt in data:
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "image"},
                {"type": "text", "text": prompt},
            ],
        },
    ]
    text_conversation = processor.apply_chat_template(conversation, add_generation_prompt=True)
    inputs = processor(images=img, text=text_conversation, return_tensors="pt")
    inputs = inputs.to(device)
    outputs = model.generate(**inputs)

However, I noticed that this has a terrible GPU utilization. My first solution was simply to use a larger batch size but still keep the processor in the for loop. However, this didn’t really help with utilization. It seemed like my utilization had higher peaks but also longer phases at 0% utilization in between, probably because the model has to wait for the processing to happen.

My current solution uses a standard torch dataset and dataloader which allows me to have the CPU prepare the next batch while the current one is being processed on the GPU:

class DefaultEvaluatorDataset(Dataset):
    def __init__(self, data_dicts: List[Dict], processor_function: ProcessorFunction):
        super().__init__()
        self.data_dicts = data_dicts
        assert len(data_dicts) > 0
        assert 'image_path' in data_dicts[0]
        assert 'prompt' in data_dicts[0]
        self.processor_function = processor_function

    def __getitem__(self, index):
        data_dict = self.data_dicts[index]
        img_path = data_dict['image_path']
        img = pil_loader(img_path)
        prompt = data_dict['prompt']

        inputs = self.processor_function(img, prompt)
        return inputs

    def __len__(self) -> int:
        return len(self.data_dicts)


class DefaultCollate:
    def __init__(self, tokenizer):
        self.llm_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)

    def __call__(self, batch):
        pixel_values = []
        for indiv_inputs in batch:
            pixel_values.append(indiv_inputs.data.pop('pixel_values'))

        #this will break if pixel values have different dimensions, in that case, use batch size = 1
        pixel_values = torch.stack(pixel_values, dim=0)
        inputs = self.llm_collator(batch)
        inputs.data['pixel_values'] = pixel_values
        inputs = BatchFeature(inputs)
        return inputs

  dataloader = torch.utils.data.DataLoader(
      dataset,
      shuffle=shuffle,
      collate_fn=collate_fn,
      batch_size=batch_size,
      num_workers=num_workers,
  )

  for inputs in data_loader:
      inputs = inputs.to(model.device, model.dtype)
      outputs = model(inputs, generation_kwargs)
       ...

processor_function here is simply a model-dependent function that takes care of the chat template and just calls the model’s processor.

This works nicely for some models where you can stack the pixel_values. However, for some models like LLaVA Next, the dimension of pixel_values depends on the input resolution. This means that at batch size > 1, this code breaks. So to do this, I would have to call the processor for the entire batch at once instead of processing each example individually and then collating the results. However, that doesn’t work so nicely with the torch dataset class which is based around processing single examples at a time.

Is there any known solution for this kind of asynchronous processing that is not too model dependent? I don’t want to reimplement half of the processor again to get proper collate, especially since models like LLava Next have quite complex processing pipelines.

Does huggingface datasets offer a solution for this?

Thank you!

1 Like

As long as it can be incorporated into a Batch object, the existing HF framework can take care of it to some extent, but if the preprocessor part is not common, it’s tough…
However, Python has strict restrictions on parallel processing in the parts that can be written in Python, so I think it is better to look for a method that relies on some external library anyway.

Yes, it’s a bit rough to be parallel with more than 1 process but still call processor for a bunch of examples at once (so you don’t have to handle manual collate).

I ended up with this

from torchvision.datasets.folder import pil_loader
from datasets import Dataset

  dataset = Dataset.from_dict(...)
  dataset = dataset.to_iterable_dataset()

  def load_image(row):
      img = pil_loader(row['image_path'])
      row['image'] = img
      return row

  dataset = dataset.map(load_image)

  def apply_processor_and_batch(rows):
      processed = processor_function(rows)
      new_row = {'batch': [processed]}
      return new_row

  dataset = dataset.map(apply_processor_and_batch, batched=True, batch_size=batch_size,
                        remove_columns=['image_path', 'prompt', 'image'])

  def collate_batches(batches):
      assert len(batches) == 1
      return batches[0]['batch']

  dataloader = torch.utils.data.DataLoader(
      dataset,
      collate_fn=collate_batches,
      shuffle=shuffle,
      batch_size=1,
      num_workers=1,
  )

where the processor function for LLama 3.2 Instruct would look like this (might depend on model)

    def processor_function(self, batch: Dict, *args, **kwargs):
        conversations = []
        for prompt in batch['prompt']:
            conversation = [
                {"role": "user", "content": [
                    {"type": "image"},
                    {"type": "text", "text": prompt}
                ]}
            ]
            text_conversation = processor.apply_chat_template(conversation, add_generation_prompt=True)
            conversations.append(text_conversation)

        inputs = processor(images=batch['image'], text=conversations, add_special_tokens=False, return_tensors="pt")
        return inputs

This is parallel since DataLoader with num_workers=1 will prepare the next batch on a different thread, but all images are handled on the same thread. Since VLMs don’t use too large batches and processing times are quite high, this doesn’t seem to be a huge issue right now.

1 Like

Was it a Python GIL constraint? As long as that’s there, it’s still impossible to speed up Python coding alone, just like C or assembler.
The specifications around parallelization and async change drastically even between 3.9 and 3.11, and I’d like to make it a last resort for us to do it on our own.

Even if it is not beautiful, it would be smart to somehow prepare the data for an existing library like this, pack it up, and hand it over.

No, afaik GIL isn’t a problem with torch Dataloader, or at least that’s what the website says:

A DataLoader uses single-process data loading by default.
Within a Python process, the Global Interpreter Lock (GIL) prevents true fully parallelizing Python code across threads. To avoid blocking computation code with data loading, PyTorch provides an easy switch to perform multi-process data loading by simply setting the argument num_workers to a positive integer.

But I do have to say that I don’t know how DataLoader is implemented under the hood.

But you cannot use huggingface Datasets with more than 1 worker unless you use sharding. Otherwise It will automatically set the number of processes to 1.

Eh…we have to set it up manually.
Well, I guess we should be thankful that torch provides a wrapper.
I don’t want to write code while worrying about the Python version.:nauseated_face:
https://pytorch.org/docs/stable/notes/multiprocessing.html