Distibuted Data Parallel in SageMaker

I have this error when try to run the custom dataset with distributed train: ErrorMessage "ModuleNotFoundError: No module named ‘torch.distributed._functional_collectives’

I couldn’t find what caused this error.

The estimator :
distribution ={“pytorchddp”: {“enabled”: True}}

hyperparameters, which are passed into the training job

hyperparameters ={
#‘model_id’: model_id, # pre-trained model
‘dataset_path’: dataset_path, # path where sagemaker will save training dataset
‘epochs’: epochs, # number of training epochs
‘per_device_train_batch_size’: per_device_train_batch_size, # batch size for training
‘lr’: learning_rate, # learning rate used during training
}

estimator = PyTorch(
checkpoint_s3_uri = f’s3://{sess.default_bucket()}/v02/checkpoints_dist’,
entry_point = ‘pylight_sc.py’, # train script
source_dir = ‘scripts’, # directory which includes all the files needed for training
instance_type = ‘ml.p4d.24xlarge’, # instances type used for the training job
instance_count = 1, # the number of instances used for training
max_run = 22460*60, # the max time of train
base_job_name = job_name, # the name of the training job
role = role, # Iam role used in training job to access AWS ressources, e.g. S3
volume_size = 300, # the size of the EBS volume in GB
py_version = “py310”,
pytorch_version = ‘2.0.1’, # the pytorch_version version used in the training job
framework_version = ‘2.0.0’,
hyperparameters = hyperparameters,
distribution = distribution
)

And the pylight_sc.py:

def training_function(args):
# set seed
set_seed(args.seed)

env = LightningEnvironment()
env.world_size = lambda: int(os.environ["WORLD_SIZE"])
env.global_rank = lambda: int(os.environ["RANK"])

policy = partial(
    size_based_auto_wrap_policy, 
    min_num_params=10000
)

fsdp = FSDPStrategy(
    auto_wrap_policy=policy,
    process_group_backend="smddp", 
    cluster_environment=env
)

dataset = load_from_disk(args.dataset_path)
# load model from the hub with a bnb config
bnb_config = BitsAndBytesConfig(
    # load_in_8bit=True,
    load_in_4bit=True,
    # bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
)

model = AutoModelForCausalLM.from_pretrained(
    args.model_id,
    use_cache=False if args.gradient_checkpointing else True,  # this is needed for gradient checkpointing
    #trust_remote_code=True,  # ATTENTION: This allows remote code execution
    device_map="auto",
    quantization_config=bnb_config
)

# create peft config
model = create_peft_config(model, args.gradient_checkpointing)
num_gpus = torch.cuda.device_count()

# Define training args
checkpoint_dir = "/opt/ml/checkpoints"
training_args = TrainingArguments(
    output_dir=checkpoint_dir,
    overwrite_output_dir=True,
    per_device_train_batch_size=args.per_device_train_batch_size,
    bf16=args.bf16,  # Use BF16 if available
    save_total_limit=2, 
    learning_rate=args.lr,
    num_train_epochs=args.epochs,
    gradient_checkpointing=args.gradient_checkpointing,
    remove_unused_columns=False,
    # logging strategies
    logging_dir=f"{checkpoint_dir}/logs",
    logging_strategy="steps",
    logging_steps=10,
    save_strategy="steps",
    save_steps= 300,
    report_to=["wandb"],
)

# Create Trainer instance 
trainer = pl.Trainer(
    model=model,
    strategy=fsdp,
    args=training_args,
    devices=num_gpus,
    train_dataset=dataset,
    data_collator=default_data_collator,
)

# Start training
trainer.train(resume_from_checkpoint= True)