@philschmid Iāve included the training and inference scripts, as well as the sagemaker train and deploy code below. Let me know if I can provide any other detail - thanks.
- train.py
- inference.py
- Launch Sagemaker Training Job
- SageMaker Endpoint deployment
train.py
import os
import ast
print(os.system('python -m pip install datasets --upgrade'))
from transformers import (
AutoModel,
Trainer,
TrainingArguments,
DataCollatorForLanguageModeling,
AutoTokenizer,
AutoFeatureExtractor,
AutoModelForMaskedLM,
default_data_collator,
AutoModelForSequenceClassification
)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from datasets import load_dataset, Dataset
import random
import logging
import sys
import argparse
import torch
import numpy as np
import pandas as pd
import datasets
print('datasets.__version__', datasets.__version__)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# hyperparameters sent by the client are passed as command-line arguments to the script.
parser.add_argument("--epochs", type=int, default=1)
parser.add_argument("--train_batch_size", type=int, default=32)
parser.add_argument("--eval_batch_size", type=int, default=64)
parser.add_argument("--warmup_steps", type=int, default=500)
parser.add_argument("--model_id", type=str)
parser.add_argument("--num_labels", type=str)
parser.add_argument("--labels", type=str)
parser.add_argument("--learning_rate", type=str, default=5e-5)
parser.add_argument("--train_file", type=str, default="train.DbEmbeddings")
parser.add_argument("--test_file", type=str, default="test.DbEmbeddings")
parser.add_argument("--fp16", type=bool, default=True)
# Data, model, and output directories
parser.add_argument("--output_data_dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
parser.add_argument("--model_dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--n_gpus", type=str, default=os.environ["SM_NUM_GPUS"])
parser.add_argument("--training_dir", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
parser.add_argument("--test_dir", type=str, default=os.environ["SM_CHANNEL_TEST"])
args, _ = parser.parse_known_args()
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.getLevelName("INFO"),
handlers=[logging.StreamHandler(sys.stdout)],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
print('\nWalk 1:')
for path, subdirs, files in os.walk('/opt/ml'):
for name in files: print(os.path.join(path, name))
raw_train_dataset = load_dataset("json", data_files=os.path.join(args.training_dir, args.train_file), cache_dir="opt/ml/input")["train"]
raw_test_dataset = load_dataset("json", data_files=os.path.join(args.test_dir, args.test_file), cache_dir="opt/ml/input")["train"]
print('\nargs.labels', args.labels)
print('type args.labels', type(args.labels))
print('type args.num_labels', args.num_labels)
args.labels = ast.literal_eval(args.labels)
args.num_labels = len(args.labels)
print('\nargs.labels', args.labels)
print('type args.labels', type(args.labels))
print('type args.num_labels', args.num_labels)
print('type(args.num_labels)', type(args.num_labels))
raw_train_dataset = raw_train_dataset.cast_column("label", datasets.ClassLabel(num_classes=args.num_labels,
names= args.labels, names_file=None, id=None))
print('\nraw_train_dataset.features', raw_train_dataset.features)
# load tokenizer
tokenizer = AutoTokenizer.from_pretrained(args.model_id)
def tokenize(examples):
result = tokenizer(examples["source"], padding=True, truncation=True)
return result
# Use batched=True to activate fast multithreading!
train_dataset = raw_train_dataset.map(
tokenize, batched=True, batch_size=None
)
test_dataset = raw_test_dataset.map(
tokenize, batched=True, batch_size=None
)
train_dataset.reset_format()
test_dataset.reset_format()
train_dataset.set_format("torch",
columns=["input_ids", "attention_mask", "label"])
test_dataset.set_format(type="pandas")
df = test_dataset[:]
df_test, df_valid = np.split(df, [int(.5*len(df))])
test_data = Dataset.from_pandas(df_test)
valid_data = Dataset.from_pandas(df_valid)
test_data = test_data.cast_column("label", datasets.ClassLabel(num_classes=args.num_labels,
names= args.labels ,
names_file=None, id=None))
valid_data = valid_data.cast_column("label", datasets.ClassLabel(num_classes=args.num_labels,
names= args.labels , names_file=None,
id=None))
test_data.reset_format()
test_data.set_format("torch",
columns=["input_ids", "attention_mask", "label"])
valid_data.reset_format()
valid_data.set_format("torch",
columns=["input_ids", "attention_mask", "label"])
from sklearn.metrics import accuracy_score, f1_score
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
f1 = f1_score(labels, preds, average="weighted")
acc = accuracy_score(labels, preds)
return {"accuracy": acc, "f1": f1}
# Saves the model to s3
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = (AutoModelForSequenceClassification
.from_pretrained(args.model_id, num_labels=args.num_labels)
.to(device))
batch_size = 64
logging_steps = len(train_dataset) // batch_size
print('device', device)
print('logging steps', logging_steps)
print('len(train_dataset)', len(train_dataset))
model_name = f"{args.model_id}-finetuned-d"
training_args = TrainingArguments(output_dir=model_name,
num_train_epochs=args.epochs,
learning_rate=2e-5,
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size,
weight_decay=0.01,
evaluation_strategy="epoch",
disable_tqdm=False,
logging_steps=logging_steps,
push_to_hub=False,
)
trainer = Trainer(model=model, args=training_args,
compute_metrics=compute_metrics,
train_dataset=train_dataset,
eval_dataset=valid_data,
tokenizer=tokenizer)
trainer.train()
preds_output = trainer.predict(test_data)
print('.')
print('preds_output.metrics:')
print(preds_output.metrics)
trainer.save_model(args.model_dir)
print(f'my_acc: {preds_output.metrics["test_accuracy"]}')
print('\nModel Walk:')
for path, subdirs, files in os.walk('/opt/ml'):
for name in files: print(os.path.join(path, name))
inference.py
import subprocess
import sys
import json
import os
import numpy as np
import torch
import boto3
from transformers import AutoModel, AutoTokenizer, AutoModelForMaskedLM
from importlib import reload
print('\nboto3 loaded')
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('\ndevice:',device)
def forward_pass(batch, model):
input_ids = torch.tensor(batch["input_ids"]).to(device)
attention_mask = torch.tensor(batch["attention_mask"]).to(device)
with torch.no_grad():
last_hidden_state = model(input_ids, attention_mask).last_hidden_state
last_hidden_state = last_hidden_state.cpu().numpy()
# Use average of unmasked hidden states for classification
lhs_shape = last_hidden_state.shape
boolean_mask = ~np.array(batch["attention_mask"]).astype(bool)
boolean_mask = np.repeat(boolean_mask, lhs_shape[-1], axis=-1)
boolean_mask = boolean_mask.reshape(lhs_shape)
masked_mean = np.ma.array(last_hidden_state, mask=boolean_mask).mean(axis=1)
batch["hidden_state"] = masked_mean.data
return batch
def preprocess_function(examples):
print('attempting to tokenize:', examples)
if examples['type'] == 'incomeType':
my_type = 'income'
else:
my_type = 'expense'
print('my_type', my_type)
t = tokenizer([examples[my_type]["description"]], truncation=True)
t['description'] = examples[my_type]["description"]
t['date'] = examples[my_type]["date"]
t['amount'] = examples[my_type]["amount"]
t['type'] = examples['type']
print('t', t)
return t
print('\nos.getcwd()', os.getcwd())
print('\nModel Walk:')
for path, subdirs, files in os.walk('/opt/ml'):
for name in files: print(os.path.join(path, name))
tokenizer = AutoTokenizer.from_pretrained('/opt/ml/model')
def model_fn(model_dir):
print('\nIn model_fn')
print('\nmodel_dir:', model_dir)
model = AutoModel.from_pretrained('/opt/ml/model', output_hidden_states=True).to(device)
return model
print('\inference directory', os.listdir(os.curdir) )
print('\nWalk:')
for path, subdirs, files in os.walk('/opt/ml'):
for name in files: print(os.path.join(path, name))
def input_fn(data, content_type):
print('\nin data', data, content_type, type(data))
request = json.loads(data)
# preprocess dataset
print('attempting preprocess')
print('request', request)
response = preprocess_function(request)
print('response', response)
print('\nfwd pass')
return response
def predict_fn(data, model):
print('\nin predict:', data)
res2 = forward_pass(data, model)
return res2
def output_fn(prediction, accept):
print('\nin output', type(prediction))
j = [{ "description":prediction.description,
"amount":prediction.amount,
"type": prediction.type,
"date":prediction.date,
"inputs":prediction.input_ids,
"embeddings":prediction.hidden_state.tolist()}]
return json.dumps(j)
Launch Sagemaker Training Job
hyperparameters={'epochs': 1, # number of training epochs
'labels' : self.preprocessor.modeldata.labels,
'num_labels' : len(self.preprocessor.modeldata.labels),
'train_batch_size': 32, # batch size for training
'eval_batch_size': 64, # batch size for evaluation
'learning_rate': 0.0001, # learning rate used during training
'model_id':self.hf_model_id, # pre-trained model
'weight_decay': 0.0005,
'fp16': True, # Whether to use 16-bit (mixed) precision training
}
metric_definitions=[{"Name": "my:acc", "Regex": "my_acc: (\S+)"},
{'Name': 'eval_loss', 'Regex': "'eval_loss': ([0-9]+(.|e\-)[0-9]+),?"},
{'Name': 'eval_accuracy', 'Regex': "'eval_accuracy': ([0-9]+(.|e\-)[0-9]+),?"},
{'Name': 'eval_f1', 'Regex': "'eval_f1': ([0-9]+(.|e\-)[0-9]+),?"},
{'Name': 'eval_precision', 'Regex': "'eval_precision': ([0-9]+(.|e\-)[0-9]+),?"}]
objective_metric_name = 'eval_loss'
objective_type = "Minimize"#Maximize
instance_type = 'ml.g4dn.2xlarge'
# create the Estimator
estimator = HuggingFace(
entry_point = 'train.py', # fine-tuning script used in training jon
source_dir = 'embed_source', # directory where fine-tuning script is stored
instance_type = instance_type, # instances type used for the training job
instance_count = 1, # the number of instances used for training
role = get_execution_role(), # Iam role used in training job to access AWS ressources,
transformers_version = '4.6', # the transformers version used in the training job
max_run= 36000,
pytorch_version = '1.7', # the pytorch_version version used in the training job
py_version = 'py36', # the python version used in the training job
hyperparameters = hyperparameters, # the hyperparameter used for running the training job
metric_definitions = metric_definitions, # the metrics regex definitions to extract logs
output_path=os.path.join(dataconnector.version_s3_prefix, "models"),
code_location=os.path.join(dataconnector.version_s3_prefix, "models")
)
inputs = {
'train': s3_input,
'test': s3_input_eval
}
self.training_job_name = 'trainjob-' + dataconnector.model_name
print('\ninputs', inputs)
estimator.fit(inputs=inputs, wait=True,
job_name=self.training_job_name,
logs=True)
SageMaker Endpoint deployment
Note that In the below Iāve commented out model_server_workers = 4. It gives the same error regardless. However, if I donāt set this then the log shows āDefault workers per model: 1ā, if I do set model_server_workers = 4 then log shows āDefault workers per model: 4ā, but still throws worker error.
emb_sm_model = HuggingFaceModel(transformers_version="4.6", # transformers version used
pytorch_version="1.7", # pytorch version used
py_version='py36', # python version used
entry_point = 'embed_source/inference.py',# configuration for loading model from Hub
model_data=emb_model.model_artifacts,
#model_server_workers = 4,
sagemaker_session=sagemaker_session,
name= emb_name, role=role)
# I've omitted code for the second and third containers, as the error is happening in container 1/the huggingface container
sm_model = PipelineModel(name=pipeline_name, role=role, models=[emb_sm_model, feat_process_sm_model, clf_sm_model])
sm_model.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge',
endpoint_name=pipeline_name, wait=False)
Iāve tried with different instance types (āml.m5.2xlargeā, āml.m5.4xlargeā). The only thing that seems to mitigate is by increasing the instance count. But even then I run into the same error when the request load gets higher.