Early stopping for eval loss causes timeout?

Hello,

I am using the run_glue_no_trainer.py (LINK) example and modified it to my need. I am running the example for bert-base-uncased using accelerate launch.

I added my early stopping class as following:

class early_stopping_callback:
  def __init__(self,min_delta=0,patience=5):
    self.min_delta=min_delta
    self.patience=patience
    self.counter=0
    self.lowest_loss=float('inf')
  def check_early_stopping(self,eval_loss):
    delta =  self.lowest_loss - eval_loss
    if delta >= self.min_delta:
      self.lowest_loss = eval_loss
      self.counter = 0
    else:
      self.counter += 1
      if self.counter >= self.patience:
        return True
    return False

After being initialized as

es_callback = early_stopping_callback()

the condition is checked at the end of an epoch (added at around line 622 in the original file):

        if args.checkpointing_steps == "epoch":
            output_dir = f"epoch_{epoch}"
            if args.output_dir is not None:
                output_dir = os.path.join(args.output_dir, output_dir)
            accelerator.save_state(output_dir)

        if es_callback.check_early_stopping(eval_loss.item()):
          print(f"Stopping early after epoch {epoch}")
          break

As you can see in the picture below, the criterion is reached at a certain point.

The weird thing is what happens next: The progress bar reports one additional step (I think), and after that, it stops doing anything. This also means that subsequent scheduled runs do not execute properly. Instead, I get a timeout error:

[E ProcessGroupNCCL.cpp:828] [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=7462, OpType=ALLREDUCE, Timeout(ms)=1800000) ran for 1800586 milliseconds before timing out.
[E ProcessGroupNCCL.cpp:828] [Rank 0] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=7463, OpType=ALLREDUCE, Timeout(ms)=1800000) ran for 1800814 milliseconds before timing out.
[E ProcessGroupNCCL.cpp:455] Some NCCL operations have failed or timed out. Due to the asynchronous nature of CUDA kernels, subsequent GPU operations might run on corrupted/incomplete data.
[E ProcessGroupNCCL.cpp:460] To avoid data inconsistency, we are taking the entire process down.
WARNING:torch.distributed.elastic.multiprocessing.api:Sending process 4174882 closing signal SIGTERM
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: -6) local_rank: 1 (pid: 4174883) of binary: /home/lange/.conda/envs/test/bin/python3.9
Traceback (most recent call last):
  File "/home/test/.conda/envs/test/bin/accelerate", line 10, in <module>
    sys.exit(main())
  File "/home/test/.conda/envs/test/lib/python3.9/site-packages/accelerate/commands/accelerate_cli.py", line 45, in main
    args.func(args)
  File "/home/test/.conda/envs/test/lib/python3.9/site-packages/accelerate/commands/launch.py", line 970, in launch_command
    multi_gpu_launcher(args)
  File "/home/test/.conda/envs/test/lib/python3.9/site-packages/accelerate/commands/launch.py", line 646, in multi_gpu_launcher
    distrib_run.run(args)
  File "/home/test/.conda/envs/test/lib/python3.9/site-packages/torch/distributed/run.py", line 785, in run
    elastic_launch(
  File "/home/test/.conda/envs/test/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 134, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
  File "/home/test/.conda/envs/test/lib/python3.9/site-packages/torch/distributed/launcher/api.py", line 250, in launch_agent
    raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError: 

I suspect it might an unproper use of the break statement in combination with how Accelerate works. Hopefully somebody can help out.

1 Like

Thanks for the report, let me run this today and reproduce to see if I can find what’s up. (The key is there’s a distributed reduce() op being done timing out.

You can also try:

pip install git+https://github.com/huggingface/accelerate

And run your code with:

ACCELERATE_DEBUG_MODE="1" accelerate launch ...

This will give us a clearer error I believe (or should)

Sorry for my late answer, I did not have access to the machine running the scripts the last days.

Unfortunately, using ACCELERATE_DEBUG_MODE=“1” did not change the error message generated and neither did updating accelerate.

Hi, did you manage to reproduce the error? I am still having the same problem.

Thanks for the ping, sorry! Will look at this today (truly today)

@GertDasPferd thanks! This is a DDP enjoyment, basically the break gets triggered on process 0 and never again on another process, leading to the hang. I’ve introduced a PR here (Introduce breakpoint API by muellerzr · Pull Request #1940 · huggingface/accelerate · GitHub) to add a utility that can help with this, and you can see the basic setup there. It was inspired by this post How to use "break" in DistributedDataParallel training - #7 by Rakshith_V - distributed-rpc - PyTorch Forums, and it made sense to just include this as part of the API for Accelerate

Thank your for your investigation. I have to admit that I do not understand all details of the problem.

Anyway, from how I understand your PR:
I would use accelerator.set_breakpoint() for when checking for the early stopping criterion. But where would I use the second function accelerator.check_breakpoint() ?

You’d use this immediatly after if es_callback.check_early_stopping():

E.g.:

if args.checkpointing_steps == "epoch":
    output_dir = f"epoch_{epoch}"
    if args.output_dir is not None:
        output_dir = os.path.join(args.output_dir, output_dir)
        accelerator.save_state(output_dir)

if es_callback.check_early_stopping(eval_loss.item()):
    print(f"Stopping early after epoch {epoch}")
    accelerator.set_breakpoint()
if accelerator.check_breakpoint():
    break

Hi again. Thanks for the response. I have been trying to properly update my environment to accelerate 0.23.0 but been havinv some problems. As soon as I managed to fix the problem, I will try out the solution and check if it is working properly.

Cheers!

Just to give a final update: the flags introduced with your PR solved the issue! Early stopping is now working as expected.

1 Like

Hi @muellerzr , is set_breakpoint diabled now? I got AttributeError: 'Accelerator' object has no attribute 'set_breakpoint' with the latest acclerate (v0.31.0).

Actually I am very excited to find this PR, since this seems quite relevant to my problem (similar to the early stopping problem), I’m using 2-gpu to train a roberta model, and I want to calculate the valication loss after eval_steps, here’s the core code:

...
model, optimizer, train_dataloader, val_dataloader = accelerator.prepare(model, optimizer, train_dataloader, val_dataloader)
...
best_eval_loss = float('inf')

# Training loop:
for epoch in range(num_epochs): 
    for batch in train_dataloader:
        model.train()
        outputs = model(**batch)
        loss = outputs.loss 
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
        progress_bar.update(1)

        # validation when reach 'eval_steps'
        if progress_bar.n > 10 and progress_bar.n % args.eval_steps == 0:
            model.eval()
            losses = []
            for val_batch in val_dataloader:
                # ----------------> stuck here!
                with torch.no_grad():
                    outputs = model(**val_batch)
                e_loss = outputs.loss
                losses.append(accelerator.gather_for_metrics(e_loss.repeat(val_bs)))
            losses = torch.cat(losses)
            eval_loss = torch.mean(losses)
            print("eval_loss", eval_loss.item())
            
            # save the current best model
            if eval_loss.item() < best_eval_loss:
                print("Current best model!, Steps:", progress_bar.n, "Eval loss:", eval_loss.item())
                # save
                ...

The process just got stuck when computing the first validation batch. Two GPUs are both 100% utilization, so I guess the process is hanging.

After a long time, an error will occur:

[E ProcessGroupNCCL.cpp:828] [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=2315, OpType=ALLREDUCE, Timeout(ms)=1800000) ran for 1808543 milliseconds before timing out.
[E ProcessGroupNCCL.cpp:828] [Rank 0] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=2315, OpType=ALLGATHER, Timeout(ms)=1800000) ran for 1808546 milliseconds before timing out.
...
Traceback (most recent call last):
  File "rolling_model_train.py", line 289, in <module>
    train()
  File "rolling_model_train.py", line 195, in train
    losses.append(accelerator.gather_for_metrics(loss.repeat(val_bs)))
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/lib/python3.8/site-packages/accelerate/accelerator.py", line 2217, in gather_for_metrics
[E ProcessGroupNCCL.cpp:455] Some NCCL operations have failed or timed out. Due to the asynchronous nature of CUDA kernels, subsequent GPU operations might run on corrupted/incomplete data.
[E ProcessGroupNCCL.cpp:460] To avoid data inconsistency, we are taking the entire process down.
terminate called after throwing an instance of 'std::runtime_error'
  what():  [Rank 0] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=2315, OpType=ALLGATHER, Timeout(ms)=1800000) ran for 1808546 milliseconds before timing out.
[E ProcessGroupNCCL.cpp:455] Some NCCL operations have failed or timed out. Due to the asynchronous nature of CUDA kernels, subsequent GPU operations might run on corrupted/incomplete data.
[E ProcessGroupNCCL.cpp:460] To avoid data inconsistency, we are taking the entire process down.
terminate called after throwing an instance of 'std::runtime_error'
  what():  [Rank 1] Watchdog caught collective operation timeout: WorkNCCL(SeqNum=2315, OpType=ALLREDUCE, Timeout(ms)=1800000) ran for 1808543 milliseconds before timing out.
ERROR:torch.distributed.elastic.multiprocessing.api:failed (exitcode: -6) local_rank: 0 (pid: 89866) of binary: /home/guoby/app/Anaconda3-2021.05/envs/news/bin/python
Traceback (most recent call last):
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/bin/accelerate", line 8, in <module>
    sys.exit(main())
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/lib/python3.8/site-packages/accelerate/commands/accelerate_cli.py", line 47, in main
    args.func(args)
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/lib/python3.8/site-packages/accelerate/commands/launch.py", line 977, in launch_command
    multi_gpu_launcher(args)
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/lib/python3.8/site-packages/accelerate/commands/launch.py", line 646, in multi_gpu_launcher
    distrib_run.run(args)
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/lib/python3.8/site-packages/torch/distributed/run.py", line 785, in run
    elastic_launch(
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/lib/python3.8/site-packages/torch/distributed/launcher/api.py", line 134, in __call__
    return launch_agent(self._config, self._entrypoint, list(args))
  File "/home/guoby/app/Anaconda3-2021.05/envs/news/lib/python3.8/site-packages/torch/distributed/launcher/api.py", line 250, in launch_agent
    raise ChildFailedError(
torch.distributed.elastic.multiprocessing.errors.ChildFailedError: 
======================================================
rolling_model_train.py FAILED
------------------------------------------------------
Failures:
[1]:
  time      : 2024-06-20_15:02:22
  rank      : 1 (local_rank: 1)
  exitcode  : -6 (pid: 89867)
  error_file: <N/A>
  traceback : Signal 6 (SIGABRT) received by PID 89867
------------------------------------------------------
Root Cause (first observed failure):
[0]:
  time      : 2024-06-20_15:02:22
  rank      : 0 (local_rank: 0)
  exitcode  : -6 (pid: 89866)
  error_file: <N/A>
  traceback : Signal 6 (SIGABRT) received by PID 89866
======================================================

Note that, if I move the validation code after each epoch, instead during one epoch, the program runs fine! Which is like this:

# Training loop:
for epoch in range(num_epochs): 
    for batch in train_dataloader:
        model.train()
        outputs = model(**batch)
        loss = outputs.loss 
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
        progress_bar.update(1)

    # validation:
    model.eval()
    losses = []
    for val_batch in val_dataloader:
        with torch.no_grad():
            outputs = model(**val_batch)
        e_loss = outputs.loss
        losses.append(accelerator.gather_for_metrics(e_loss.repeat(val_bs)))
    losses = torch.cat(losses)
    eval_loss = torch.mean(losses)
    print("eval_loss", eval_loss.item())

Could you help me on this? Thanks a lot!