I have this error when try to run the custom dataset with distributed train: ErrorMessage "ModuleNotFoundError: No module named ‘torch.distributed._functional_collectives’
I couldn’t find what caused this error.
The estimator :
distribution ={“pytorchddp”: {“enabled”: True}}
hyperparameters, which are passed into the training job
hyperparameters ={
#‘model_id’: model_id, # pre-trained model
‘dataset_path’: dataset_path, # path where sagemaker will save training dataset
‘epochs’: epochs, # number of training epochs
‘per_device_train_batch_size’: per_device_train_batch_size, # batch size for training
‘lr’: learning_rate, # learning rate used during training
}
estimator = PyTorch(
checkpoint_s3_uri = f’s3://{sess.default_bucket()}/v02/checkpoints_dist’,
entry_point = ‘pylight_sc.py’, # train script
source_dir = ‘scripts’, # directory which includes all the files needed for training
instance_type = ‘ml.p4d.24xlarge’, # instances type used for the training job
instance_count = 1, # the number of instances used for training
max_run = 22460*60, # the max time of train
base_job_name = job_name, # the name of the training job
role = role, # Iam role used in training job to access AWS ressources, e.g. S3
volume_size = 300, # the size of the EBS volume in GB
py_version = “py310”,
pytorch_version = ‘2.0.1’, # the pytorch_version version used in the training job
framework_version = ‘2.0.0’,
hyperparameters = hyperparameters,
distribution = distribution
)
And the pylight_sc.py:
def training_function(args):
# set seed
set_seed(args.seed)
env = LightningEnvironment()
env.world_size = lambda: int(os.environ["WORLD_SIZE"])
env.global_rank = lambda: int(os.environ["RANK"])
policy = partial(
size_based_auto_wrap_policy,
min_num_params=10000
)
fsdp = FSDPStrategy(
auto_wrap_policy=policy,
process_group_backend="smddp",
cluster_environment=env
)
dataset = load_from_disk(args.dataset_path)
# load model from the hub with a bnb config
bnb_config = BitsAndBytesConfig(
# load_in_8bit=True,
load_in_4bit=True,
# bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
)
model = AutoModelForCausalLM.from_pretrained(
args.model_id,
use_cache=False if args.gradient_checkpointing else True, # this is needed for gradient checkpointing
#trust_remote_code=True, # ATTENTION: This allows remote code execution
device_map="auto",
quantization_config=bnb_config
)
# create peft config
model = create_peft_config(model, args.gradient_checkpointing)
num_gpus = torch.cuda.device_count()
# Define training args
checkpoint_dir = "/opt/ml/checkpoints"
training_args = TrainingArguments(
output_dir=checkpoint_dir,
overwrite_output_dir=True,
per_device_train_batch_size=args.per_device_train_batch_size,
bf16=args.bf16, # Use BF16 if available
save_total_limit=2,
learning_rate=args.lr,
num_train_epochs=args.epochs,
gradient_checkpointing=args.gradient_checkpointing,
remove_unused_columns=False,
# logging strategies
logging_dir=f"{checkpoint_dir}/logs",
logging_strategy="steps",
logging_steps=10,
save_strategy="steps",
save_steps= 300,
report_to=["wandb"],
)
# Create Trainer instance
trainer = pl.Trainer(
model=model,
strategy=fsdp,
args=training_args,
devices=num_gpus,
train_dataset=dataset,
data_collator=default_data_collator,
)
# Start training
trainer.train(resume_from_checkpoint= True)