How to Train a Model with Pytorch Lightning with Huggingface

I have an image classification dataset I want to preprocess and train using Pytorch Lightning.

What is the appropriate way to use Dataset.set_transform(), and Pytorch Dataloaders to

  1. parallelize the preprocessing
  2. Run Trainer.fit()

My training script:

import torch
import torchvision.transforms as transforms
from datasets import load_dataset
from efficientnet_pytorch import EfficientNet
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
import torchmetrics
from pytorch_lightning import Trainer


  # Data Pipeline
dataset = load_dataset("ares1123/celebrity_dataset")
dataset = dataset.rename_column("image", "input")
num_classes = len(set(dataset["train"]["label"]))
class_names = dataset["train"].features
split_dataset = dataset['train'].train_test_split(test_size=0.25)
split_dataset["validation"] = split_dataset.pop("test")


preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((256,256)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                       std=[0.229, 0.224, 0.225])
    ])

def transform_batched(examples):
    examples["input"] = [preprocess(image) for image in examples["input"]]
    return examples

split_dataset.set_transform(transform_batched)

batch_sizes = (32, 8)
train_dataloader = DataLoader(split_dataset['train'], batch_size=batch_sizes[0], shuffle=True)
val_dataloader = DataLoader(split_dataset['validation'], batch_size=batch_sizes[1])

# Model Definition
class ImageClassifier(pl.LightningModule):
    def __init__(self, num_classes):
        super().__init__()
        # Define model and change the classifier layer
        self.model = EfficientNet.from_pretrained('efficientnet-b0', num_classes=num_classes)
        self.criterion = nn.CrossEntropyLoss()
        self.val_accuracy = torchmetrics.Accuracy(task='multiclass',                                           
                                     num_classes=num_classes).to(device)
        self.val_precision = torchmetrics.Precision(task='multiclass', num_classes=num_classes, average='macro').to(device)
        self.val_recall = torchmetrics.Recall(task='multiclass', num_classes=num_classes, average='macro').to(device)

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.criterion(outputs, labels)
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.criterion(outputs, labels)
        self.log('val_loss', loss)

        preds = torch.argmax(outputs, dim=1)
        self.val_accuracy.update(preds, labels)
        self.val_precision.update(preds, labels)
        self.val_recall.update(preds, labels)

        self.log('val_loss', loss, prog_bar=True)

    def on_validation_epoch_end(self, validation_step_outputs):
        # Log metrics at the end of the epoch
        self.log('val_accuracy', self.val_accuracy.compute(), prog_bar=True)
        self.log('val_precision', self.val_precision.compute(), prog_bar=True)
        self.log('val_recall', self.val_recall.compute(), prog_bar=True)

        # Reset metrics at the end of each epoch
        self.val_accuracy.reset()
        self.val_precision.reset()
        self.val_recall.reset()

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

# Training Pipeline using Pytorch Lightning
model = ImageClassifier(num_classes=num_classes)

# Checkpoint callback to save the best model
checkpoint_callback = ModelCheckpoint(monitor='val_loss', dirpath='checkpoints/', filename='best-checkpoint', save_top_k=1, mode='min')
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5)
trainer = Trainer(callbacks=[checkpoint_callback, early_stopping_callback], max_epochs=50, accelerator="gpu")

# Start training
trainer.fit(model, train_dataloader, val_dataloader)

This raises the error:

INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name          | Type                | Params
------------------------------------------------------
0 | model         | EfficientNet        | 5.3 M 
1 | criterion     | CrossEntropyLoss    | 0     
2 | val_accuracy  | MulticlassAccuracy  | 0     
3 | val_precision | MulticlassPrecision | 0     
4 | val_recall    | MulticlassRecall    | 0     
------------------------------------------------------
5.3 M     Trainable params
0         Non-trainable params
5.3 M     Total params
21.139    Total estimated model params size (MB)
Loaded pretrained weights for efficientnet-b0
Sanity Checking DataLoader 0:   0%
 0/2 [00:00<?, ?it/s]
dict_keys(['input', 'label'])
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-42-7d936e2318e9> in <cell line: 22>()
     20 
     21 # Start training
---> 22 trainer.fit(model, train_dataloader, val_dataloader)

25 frames
/usr/local/lib/python3.10/dist-packages/torch/nn/functional.py in pad(input, pad, mode, value)
   4493                     input, pad
   4494                 )
-> 4495     return torch._C._nn.pad(input, pad, mode, value)
   4496 
   4497 # TODO: Fix via https://github.com/pytorch/pytorch/issues/75798

TypeError: pad(): argument 'input' (position 1) must be Tensor, not str

This seems like a mismatch in the training handshake between PL and the dataset/dataloader. What causes it?

To parallelize the preprocessing using Datasets.map() in Google Colab while avoiding slow processing due to idling, you can follow these steps:

Use num_proc Argument: The map() function in the datasets library has an argument num_proc that allows you to specify the number of processes to use for parallelization. You can set this to a value based on the number of available CPU cores in Colab. For example, if you have 2 CPU cores available, you can set num_proc=2 for parallel processing.

Avoid Looping Through Data: Instead of looping through the data or using list comprehension, you can directly apply transformations to the entire dataset using map().

Consider Using Batched Operations: Although using batched=True in map() requires looping through the data, it can still be efficient if the preprocessing operation is applied in batches. You can experiment with this approach to see if it provides better performance compared to processing individual examples.

Optimize Data Loading: Since Colab has limited resources and may idle, you can optimize data loading by prefetching and caching data. This can help reduce the overhead of data loading during training.

Collate Function in Dataloader: Using a collate function in the DataLoader can be effective for further optimizing data loading and preprocessing. You can define a custom collate function to perform additional preprocessing steps and handle batching efficiently.

Here’s a modified version of your code incorporating these suggestions:

from datasets import load_dataset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

 Load dataset
dataset = load_dataset("pointer_to_dataset")
split_dataset = dataset['train'].train_test_split(test_size=0.25)

 Define preprocessing transformations
preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((256, 256)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

 Apply transformations using map() with parallel processing
split_dataset = split_dataset.map(lambda example: {'image': preprocess(example['image'])}, num_proc=2)

 Create dataloaders
batch_sizes = (32, 8)
train_dataloader = DataLoader(split_dataset['train'], batch_size=batch_sizes[0], shuffle=True)
val_dataloader = DataLoader(split_dataset['test'], batch_size=batch_sizes[1])

 Optionally, define a custom collate function for additional preprocessing
def custom_collate_fn(batch):
     Implement custom preprocessing or batching logic here
    return batch

 Example usage:
 train_dataloader = DataLoader(split_dataset['train'], batch_size=batch_sizes[0], shuffle=True, collate_fn=custom_collate_fn)
 val_dataloader = DataLoader(split_dataset['test'], batch_size=batch_sizes[1], collate_fn=custom_collate_fn)

By incorporating these suggestions, you can preprocess and train your image classification dataset in an optimal fashion, leveraging parallel processing and efficient data loading techniques in Google Colab.