Getting "No worker is available to serve request: model" with HuggingFaceModel endpoint

I have a custom container that takes a request, does some feature extraction and then passes on the enhanced request to a classifier endpoint. During feature extraction another endpoint is being called for generating text embeddings. I am using HuggingFaceModel for my embedding endpoint.


 HuggingFaceModel(transformers_version="4.6", # transformers version used
                                            pytorch_version="1.7", # pytorch version used
                                            py_version='py36', # python version used
                                            entry_point = 'embed_source/inference.py',
                                            model_data=emb_model.model_artifacts,
                                        name=emb_name, role=role)

It is currently deployed on an ml.m5.xlarge and cpu has been working well up until this point. Today I got the error message “No worker is available to serve request: model” when receiving a small spike in requests (~500). Looking at the logs it was working fine and then started receving this response when trying to call the embedding model. I tried stress testing the endpoint myself but was able to do 5K requests in a minute without any issue, so it doesn’t seem to be strictly a volume issue?

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2073, in wsgi_app
    response = self.full_dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1518, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1516, in full_dispatch_request
    rv = self.dispatch_request()
  File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1502, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "/opt/program/predictor.py", line 56, in transformation
    result = preprocessor.transform(data)
  File "/opt/program/preprocessor.py", line 189, in transform
    response = embed_predictor.predict(data=json.dumps(payload))
  File "/usr/local/lib/python3.7/site-packages/sagemaker/predictor.py", line 136, in predict
    response = self.sagemaker_session.sagemaker_runtime_client.invoke_endpoint(**request_args)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 386, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 705, in _make_api_call
    raise error_class(parsed_response, operation_name)


botocore.errorfactory.ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (503) from primary with message "{
  "code": 503,
  "type": "ServiceUnavailableException",
  "message": "No worker is available to serve request: model"
}

EDIT: Actually just realize you pass the arg model_server_workers directly to HuggingFace

There seems to be an easy way to add more workers by passing env = { ‘MODEL_SERVER_WORKERS’: ‘X’} when calling the HuggingFaceModel, per this link

However, I don’t understand why this is triggered, so I don’t have any reasonable idea what the numbers of workers should be set to, or if it will really resolved the issue. Can someone help me understand what would trigger this specific error and how I should go about addressing it?

Inference script:



import subprocess
import sys
import json
def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

res = install('boto3')

print('\ninstall res', res)
    
import os
import numpy as np
import torch
import boto3
from transformers import AutoModel, AutoTokenizer, AutoModelForMaskedLM
from importlib import reload    
print('\nboto3 loaded')    

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('\ndevice!::',device)
    
def forward_pass(batch, model):
    input_ids = torch.tensor(batch["input_ids"]).to(device)
    attention_mask = torch.tensor(batch["attention_mask"]).to(device)

    with torch.no_grad():
        last_hidden_state = model(input_ids, attention_mask).last_hidden_state
        last_hidden_state = last_hidden_state.cpu().numpy()

    # Use average of unmasked hidden states for classification
    lhs_shape = last_hidden_state.shape
    boolean_mask = ~np.array(batch["attention_mask"]).astype(bool)
    boolean_mask = np.repeat(boolean_mask, lhs_shape[-1], axis=-1)
    boolean_mask = boolean_mask.reshape(lhs_shape)
    masked_mean = np.ma.array(last_hidden_state, mask=boolean_mask).mean(axis=1)
    batch["hidden_state"] = masked_mean.data
    return batch

def preprocess_function(examples):
    print('attempting to tokenize.')
    t = tokenizer([examples["source"]], truncation=True)
    t['source'] = examples["source"]
    return t

tokenizer = AutoTokenizer.from_pretrained('/opt/ml/model')

def model_fn(model_dir):
    print('\nIn model_fn')
    print('\nmodel_dir::', model_dir)
    model = AutoModel.from_pretrained('/opt/ml/model',  output_hidden_states=True).to(device)
    
    print('\nmodel read in')

    
    return model

for path, subdirs, files in os.walk('/opt/ml'): 
    for name in files: print(os.path.join(path, name))

print('\ntokenizer got')
        
def input_fn(data, content_type):
    print('\nin data', data, content_type, type(data))
    request = json.loads(data)
    
    # preprocess dataset
    print('attempting preprocess')
    response = preprocess_function(request)
    print('response', response)
    
    return response

def predict_fn(data, model):
    print('\nin predict:', data)
    res = forward_pass(data, model)
    return res

def output_fn(prediction, accept):
    print('\nin output',  type(prediction))
    j = [{"inputs":prediction.input_ids, "source":prediction.source,"embeddings":prediction.hidden_state.tolist()}]
    return json.dumps(j)

Hello @MaximusDecimusMeridi,

by default the HuggingFace Inference DLC starts as many Workers as CPU cores you have. Meaning for m5n.xlarge instance you have 4 workers.

Regarding the error you see:

  • Are you using Multi-Model Endpoints?
  • What was the memory utilization?
  • How long does the request take? → It could be possible that all workers were blocked due to either long inference or a deadlock inside your code and didn’t finish so it wasn’t possible to receive new requests
  • could you try updating the latest image? Reference
  • “During feature extraction another endpoint is being called for generating text embeddings” → does this mean the endpoint which returned the 503 calls another endpoint? (i couldn’t find something in the script) If that’s true then point 3 might be the reason. Since you would block the worker until the inner requests is resolved and generation can take quite long.

P.S. feel free to share a proper architecture on what you do. Happy to potentially improve it and solve those bottlenecks with a more async approach.

Yes you are correct, it is calling the embedding model as a separate service. And latency seems to be the issue (at least it coincides with errors) I am now trying deploying them as containers on the same endpoint with sagemaker inference pipelines.

So currently I have a setup where I have a sagemaker inference pipeline with a preprocessing container, and then a classifier. In the preprocessing container I call the embedding endpoint and extract some other features. Here is the script in the preprocessing container:

def transform(payload):

    json_data = json.loads(payload)

    # Pass unprocessed description
    payload = {
        "source": json_data['description']
    }

    response = embed_predictor.predict(data=json.dumps(payload))

    vec = json.loads(response)[0]['embeddings'][0]

    df = pd.DataFrame([meta])

    features_to_append = extract_features(df)

    feat_vals = df[features_to_append].values

    vec = vec + feat_vals[0].tolist()

    return [vec]

so it’s when this transform function is run that the huggingface embedding endpoint is called, which runs the inference script provided in the initial post. Happy to share more detail. I suspect as well the issue is calling the embedding endpoint as a separate service. Memory utilization on either endpoint is low, with a max of ~5% or so. I did try to stress test the endpoint and had no issue - but it is possible the load was not as high in terms of requests per second as the spike (I see bursts of requests where we are hit with 200 request in a second).

I’ve gotten the full inference pipeline to work now - it uses three stacked container.

  1. Huggingface embedding container
  2. Additional feature extraction (a few date features etc)
  3. Classifier that outputs predictions

It is working fine, and the response time is about 200 ms. But so was the previous endpoint :stuck_out_tongue: I guess I have to run a more intense load test to see if this handles it better?

For clarity, the previous setup (with worker issues) was having two endpoints - a huggingface embedding endpoint, and a Sagemaker inference pipeline with 2 containers:

  1. Extract date features, and calls the huggingface embedding endpoint
  2. Classifier that outputs prediction

Thanks for the help!

1 Like

If you wouldn’t mind, could you explain this a bit further? Intuitively it makes sense, but I want to make sure I understand the architecture. Why wouldn’t making requests directly to the endpoint have the same worker tie up issue? If the request data size is consistent (currently each endpoint currently only accepts a single data point), is there a reason why latency would go up?

If would use, e.g. AWS Lambda to make the requests to your models you could do everything asynchronously. Meaning

Lambda → worker 1
Lambda → worker 2

and not
Lambda → worker 1 → worker 2

This might not make a difference when you have 1 request, but let’s say you have 10 in parallel then worker 1 wouldn’t block until the inner request to worker 2 is done it can handle another request. By calling the request in worker 1 you automatically extend the compute time of worker 1 with the duration of the request.

Thanks. We do have a lambda that makes the prediction call to the endpoint. Where can I read more about how to route a request to specific workers and avoiding blocking request? When I look at the logs, by the output it looks like endpoint one is processing requests in parallel (the log output is not in sequence and is jumbled up). So it looks like the first endpoint will send the requests in parallel to the embedding endpoint. But I’m not understanding how I control routing requests to individual workers in parallel vs have them queue up (i.e. Lambda → worker 1 → worker2). Is there a starting point for me to get a better general understanding how this works? :pray: :pray: :pray:

@philschmid I have now set up the containers to go off in order on the same endpoint as a SageMaker inference pipeline, with container 1 being the huggingface embedding container. I am still getting the error “no worker is available to serve request: model” from container 1 when the request volume goes up. I am using a ml.m5.4xlarge instance, and will try using a GPU next, but there should not make a difference since I am sending single requests so there is nothing to parallelize? Is the “no workers” error simply coming from it being overloaded by requests? And if so what is the solution, just splitting it across multiple instances? I’m having a hard time getting any kind of estimate what request load will trigger these errors, other than trial and error.

Edit: when I tried with gd4n.xlarge I get “cuda out of memory” error:
RuntimeError: CUDA out of memory. Tried to allocate 20.00 MiB (GPU 0; 14.76 GiB total capacity; 182.14 MiB already allocated; 22.44 MiB free; 192.00 MiB reserved in total by PyTorch)

I only have seen this error no worker is available to serve request: model in the conversation with you before. So it is really strange and should be related to either your inference code or how you deploy the endpoints.

Could you please share the exact code you use to deploy the endpoints and which model you use?

@philschmid I’ve included the training and inference scripts, as well as the sagemaker train and deploy code below. Let me know if I can provide any other detail - thanks.

  • train.py
  • inference.py
  • Launch Sagemaker Training Job
  • SageMaker Endpoint deployment

train.py


import os
import ast
print(os.system('python -m pip install datasets --upgrade'))

from transformers import (
    AutoModel,
    Trainer,
    TrainingArguments,
    DataCollatorForLanguageModeling,
    AutoTokenizer,
    AutoFeatureExtractor,
    AutoModelForMaskedLM,
    default_data_collator,
    AutoModelForSequenceClassification
)

from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from datasets import load_dataset, Dataset
import random
import logging
import sys
import argparse
import torch
import numpy as np
import pandas as pd
import datasets

print('datasets.__version__', datasets.__version__)

if __name__ == "__main__":

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument("--epochs", type=int, default=1)
    parser.add_argument("--train_batch_size", type=int, default=32)
    parser.add_argument("--eval_batch_size", type=int, default=64)
    parser.add_argument("--warmup_steps", type=int, default=500)
    parser.add_argument("--model_id", type=str)
    parser.add_argument("--num_labels", type=str)
    parser.add_argument("--labels", type=str)
    parser.add_argument("--learning_rate", type=str, default=5e-5)
    parser.add_argument("--train_file", type=str, default="train.DbEmbeddings")
    parser.add_argument("--test_file", type=str, default="test.DbEmbeddings")
    parser.add_argument("--fp16", type=bool, default=True)

    # Data, model, and output directories
    parser.add_argument("--output_data_dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
    parser.add_argument("--model_dir", type=str, default=os.environ["SM_MODEL_DIR"])
    parser.add_argument("--n_gpus", type=str, default=os.environ["SM_NUM_GPUS"])
    parser.add_argument("--training_dir", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
    parser.add_argument("--test_dir", type=str, default=os.environ["SM_CHANNEL_TEST"])

    args, _ = parser.parse_known_args()

    # Set up logging
    logger = logging.getLogger(__name__)

    logging.basicConfig(
        level=logging.getLevelName("INFO"),
        handlers=[logging.StreamHandler(sys.stdout)],
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    
    print('\nWalk 1:')
    for path, subdirs, files in os.walk('/opt/ml'): 
        for name in files: print(os.path.join(path, name))
          
    raw_train_dataset = load_dataset("json", data_files=os.path.join(args.training_dir, args.train_file), cache_dir="opt/ml/input")["train"]
    raw_test_dataset = load_dataset("json", data_files=os.path.join(args.test_dir, args.test_file), cache_dir="opt/ml/input")["train"]
    
    
    print('\nargs.labels', args.labels)
    print('type args.labels', type(args.labels))
    print('type args.num_labels', args.num_labels)
    
    args.labels = ast.literal_eval(args.labels)
    args.num_labels = len(args.labels)
    
    print('\nargs.labels', args.labels)
    print('type args.labels', type(args.labels))
    print('type args.num_labels', args.num_labels)
    
    
    print('type(args.num_labels)', type(args.num_labels))
    raw_train_dataset = raw_train_dataset.cast_column("label", datasets.ClassLabel(num_classes=args.num_labels, 
                                                                                   names= args.labels, names_file=None, id=None))
    
    print('\nraw_train_dataset.features', raw_train_dataset.features)

    # load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(args.model_id)



    def tokenize(examples):
        result = tokenizer(examples["source"], padding=True, truncation=True)

        return result


    # Use batched=True to activate fast multithreading!
    train_dataset = raw_train_dataset.map(
        tokenize, batched=True, batch_size=None
    )
    test_dataset = raw_test_dataset.map(
        tokenize, batched=True, batch_size=None
    )


    train_dataset.reset_format()
    test_dataset.reset_format()
    
    train_dataset.set_format("torch",
                                columns=["input_ids", "attention_mask", "label"])

    test_dataset.set_format(type="pandas")
    df = test_dataset[:]
    df_test, df_valid = np.split(df, [int(.5*len(df))])
    test_data = Dataset.from_pandas(df_test)
    valid_data = Dataset.from_pandas(df_valid)

    test_data = test_data.cast_column("label", datasets.ClassLabel(num_classes=args.num_labels, 
                                                                 names= args.labels , 
                                                                   names_file=None, id=None))

    valid_data = valid_data.cast_column("label", datasets.ClassLabel(num_classes=args.num_labels, 
                                                                 names= args.labels , names_file=None, 
                                                                   id=None))

    test_data.reset_format()
    test_data.set_format("torch",
                                columns=["input_ids", "attention_mask", "label"])

    valid_data.reset_format()
    valid_data.set_format("torch",
                                columns=["input_ids", "attention_mask", "label"])

    from sklearn.metrics import accuracy_score, f1_score

    def compute_metrics(pred):
        labels = pred.label_ids
        preds = pred.predictions.argmax(-1)
        f1 = f1_score(labels, preds, average="weighted")
        acc = accuracy_score(labels, preds)
        return {"accuracy": acc, "f1": f1}    
        # Saves the model to s3



    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model = (AutoModelForSequenceClassification
             .from_pretrained(args.model_id, num_labels=args.num_labels)
             .to(device))                      

    batch_size = 64
    logging_steps = len(train_dataset) // batch_size
    print('device', device)
    print('logging steps', logging_steps)
    print('len(train_dataset)', len(train_dataset))
    
    model_name = f"{args.model_id}-finetuned-d"
    training_args = TrainingArguments(output_dir=model_name,
                                      num_train_epochs=args.epochs,
                                      learning_rate=2e-5,
                                      per_device_train_batch_size=batch_size,
                                      per_device_eval_batch_size=batch_size,
                                      weight_decay=0.01,
                                      evaluation_strategy="epoch",
                                      disable_tqdm=False,
                                      logging_steps=logging_steps,
                                      push_to_hub=False,
                                     )                       


    trainer = Trainer(model=model, args=training_args,
                      compute_metrics=compute_metrics,
                      train_dataset=train_dataset,
                      eval_dataset=valid_data,
                      tokenizer=tokenizer)                       
    trainer.train()
    
    

    preds_output = trainer.predict(test_data)
    
    print('.')
    print('preds_output.metrics:')
    print(preds_output.metrics)
            
    
    trainer.save_model(args.model_dir)
    
                       
    print(f'my_acc: {preds_output.metrics["test_accuracy"]}')
                           
    print('\nModel Walk:')
                           
    for path, subdirs, files in os.walk('/opt/ml'): 
        for name in files: print(os.path.join(path, name))

inference.py


import subprocess
import sys
import json
import os
import numpy as np
import torch
import boto3
from transformers import AutoModel, AutoTokenizer, AutoModelForMaskedLM
from importlib import reload    
print('\nboto3 loaded')    

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print('\ndevice:',device)
    
def forward_pass(batch, model):
    input_ids = torch.tensor(batch["input_ids"]).to(device)
    attention_mask = torch.tensor(batch["attention_mask"]).to(device)

    with torch.no_grad():
        last_hidden_state = model(input_ids, attention_mask).last_hidden_state
        last_hidden_state = last_hidden_state.cpu().numpy()

    # Use average of unmasked hidden states for classification
    lhs_shape = last_hidden_state.shape
    boolean_mask = ~np.array(batch["attention_mask"]).astype(bool)
    boolean_mask = np.repeat(boolean_mask, lhs_shape[-1], axis=-1)
    boolean_mask = boolean_mask.reshape(lhs_shape)
    masked_mean = np.ma.array(last_hidden_state, mask=boolean_mask).mean(axis=1)
    batch["hidden_state"] = masked_mean.data
    return batch


def preprocess_function(examples):
    print('attempting to tokenize:', examples)
    if examples['type'] == 'incomeType':
        my_type = 'income'
    else:
        my_type = 'expense'
    print('my_type', my_type)    
    
    t = tokenizer([examples[my_type]["description"]], truncation=True)
    t['description'] = examples[my_type]["description"]
    t['date'] = examples[my_type]["date"]
    t['amount'] = examples[my_type]["amount"]
    t['type'] = examples['type']
    print('t', t)
    return t

print('\nos.getcwd()', os.getcwd())
print('\nModel Walk:')
for path, subdirs, files in os.walk('/opt/ml'): 
    for name in files: print(os.path.join(path, name))
tokenizer = AutoTokenizer.from_pretrained('/opt/ml/model')

def model_fn(model_dir):
    print('\nIn model_fn')
    print('\nmodel_dir:', model_dir)
    model = AutoModel.from_pretrained('/opt/ml/model',  output_hidden_states=True).to(device)
    
    return model


print('\inference directory', os.listdir(os.curdir) )

print('\nWalk:')

for path, subdirs, files in os.walk('/opt/ml'): 
    for name in files: print(os.path.join(path, name))

        
def input_fn(data, content_type):
    print('\nin data', data, content_type, type(data))
    request = json.loads(data)
    
    # preprocess dataset
    print('attempting preprocess')
    print('request', request)
    response = preprocess_function(request)
    print('response', response)
    print('\nfwd pass')
    
    return response

def predict_fn(data, model):
    print('\nin predict:', data)
    res2 = forward_pass(data, model)
    return res2

def output_fn(prediction, accept):
    print('\nin output',  type(prediction))
    j = [{ "description":prediction.description,
          "amount":prediction.amount,
          "type": prediction.type,
          "date":prediction.date,
          "inputs":prediction.input_ids, 
          "embeddings":prediction.hidden_state.tolist()}]
    return json.dumps(j)

Launch Sagemaker Training Job


hyperparameters={'epochs': 1,                          # number of training epochs
     'labels' : self.preprocessor.modeldata.labels,
     'num_labels' : len(self.preprocessor.modeldata.labels),                             
     'train_batch_size': 32,               # batch size for training
     'eval_batch_size': 64,                # batch size for evaluation
     'learning_rate': 0.0001,                # learning rate used during training
     'model_id':self.hf_model_id, # pre-trained model
    'weight_decay': 0.0005,
     'fp16': True,                         # Whether to use 16-bit (mixed) precision training
     }

metric_definitions=[{"Name": "my:acc", "Regex": "my_acc: (\S+)"},
            {'Name': 'eval_loss',               'Regex': "'eval_loss': ([0-9]+(.|e\-)[0-9]+),?"},
            {'Name': 'eval_accuracy',           'Regex': "'eval_accuracy': ([0-9]+(.|e\-)[0-9]+),?"},
            {'Name': 'eval_f1',                 'Regex': "'eval_f1': ([0-9]+(.|e\-)[0-9]+),?"},
            {'Name': 'eval_precision',          'Regex': "'eval_precision': ([0-9]+(.|e\-)[0-9]+),?"}]
objective_metric_name = 'eval_loss'
objective_type = "Minimize"#Maximize

instance_type = 'ml.g4dn.2xlarge'

# create the Estimator
estimator = HuggingFace(
    entry_point          = 'train.py',        # fine-tuning script used in training jon
    source_dir           = 'embed_source',      # directory where fine-tuning script is stored
    instance_type        = instance_type,   # instances type used for the training job
    instance_count       = 1,                 # the number of instances used for training
    role                 = get_execution_role(), # Iam role used in training job to access AWS ressources, 
    transformers_version = '4.6',             # the transformers version used in the training job
    max_run= 36000,
    pytorch_version      = '1.7',             # the pytorch_version version used in the training job
    py_version           = 'py36',            # the python version used in the training job
    hyperparameters      = hyperparameters,   # the hyperparameter used for running the training job
    metric_definitions   = metric_definitions, # the metrics regex definitions to extract logs
    output_path=os.path.join(dataconnector.version_s3_prefix,  "models"),
    code_location=os.path.join(dataconnector.version_s3_prefix,  "models")
)

inputs = {
    'train': s3_input,
    'test': s3_input_eval
}


self.training_job_name = 'trainjob-' + dataconnector.model_name
print('\ninputs', inputs)
estimator.fit(inputs=inputs, wait=True,
              job_name=self.training_job_name, 
              logs=True)

SageMaker Endpoint deployment

Note that In the below I’ve commented out model_server_workers = 4. It gives the same error regardless. However, if I don’t set this then the log shows “Default workers per model: 1”, if I do set model_server_workers = 4 then log shows “Default workers per model: 4”, but still throws worker error.

emb_sm_model = HuggingFaceModel(transformers_version="4.6", # transformers version used
                                                pytorch_version="1.7", # pytorch version used
                                                py_version='py36', # python version used
                                                entry_point = 'embed_source/inference.py',# configuration for loading model from Hub
                                                model_data=emb_model.model_artifacts,
                                             #model_server_workers = 4, 
                                            sagemaker_session=sagemaker_session,
                                            name= emb_name, role=role)


# I've omitted code for the second and third containers, as the error is happening in container 1/the huggingface container

sm_model = PipelineModel(name=pipeline_name, role=role, models=[emb_sm_model, feat_process_sm_model, clf_sm_model])


sm_model.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge',
                endpoint_name=pipeline_name, wait=False)

I’ve tried with different instance types (‘ml.m5.2xlarge’, ‘ml.m5.4xlarge’). The only thing that seems to mitigate is by increasing the instance count. But even then I run into the same error when the request load gets higher.

Thank you for all the information @MaximusDecimusMeridi!

Thats was helpful to better understand whats happening. I think the root cause for your error comes from the PipelineModel. Reading the sagemaker-sdk documentation and an example it looks that the Pipeline is seen as 1 big endpoint.
Meaning if you send a request to it, it will be blocked until the whole pipeline is served, meaning all 3 models, which can take time on CPU.
So the explanation for “no worker is available to serve request: model” is either SageMaker errors out when all workers are busy or another option would be if the prediction of model 2 in your pipeline takes longer than for model 1 than the bottleneck would be at model 2, e.g. you get a lot of requests and model 1 can handle it fast model 2 is still busy with an older request and then now workers are available for that.

The solution to this that came in my head is either to create the pipeline yourself with AWS Lambda, meaning you deploy your models as individual endpoints or as Multi-model-endpoint or Multi-container-endpoint and then the lambda calls the models in sync. Or you adjust your inference.py to include all 3 models in the endpoint, which might lack some efficient calling opportunities.

Thanks for the answer. Using embeddings for a downstream classification task must be a pretty common usecase, is there a best practice for how to do this in production?

Summarizing the different approaches.

Inference pipeline
From you explanation this seems inherently problematic, and just not the right tool for this job. I guess the issue is the heavy lifting embedding model in the beginning causes issues. My current setup look slike this

Container 1 - hf embedding container
Container -2 extract/append additional features
Container-3 - run final classification model

And I keep getting the worker error, unless I the requests across a sufficient number of instances.

multi-model endpoint

" You cannot mix and match frameworks for models with a Multi-Model Endpoint"

So I would then need to use the HF pytorch container for everything? It seems models for multi model endpoints expect an image uri and the model artifacts and that’s it.

Multi model Supported Algorithms and Frameworks

Does it hf container support multi model endpoints or would it need to be modified?

Multi-model endpoints are not supported on GPU instance types.

Currently I’m not using GPU for inference, as I am processing one request at a time. However, I am planning on processing small batch requests (< 1K records) directly on the endpoint. Would it make sense to use GPU for 1K records, or is that not enough to take advantage of GPU paralellism?

Multi-model endpoints also enable time-sharing of memory resources across your models. This works best when the models are fairly similar in size and invocation latency. When this is the case, multi-model endpoints can effectively use instances across all models. If you have models that have significantly higher transactions per second (TPS) or latency requirements, we recommend hosting them on dedicated endpoints.

That seems again like a similar problem. My classifier model is much smaller than the embedding transformer model. From this alone it sounds like I should go the dedicated endpoints route. It seems like multi model containers are more geared for different flavors of the same model, vs a pipeline of different models. So if I have multiple embeddings models (which I plan on) they can all be hosted together on single embedding multi model endpoint. The only limitation then would be that I can’t use GPU for inference (but it’s not clear that I would want to for realtime inference).

Multi container endpoint

I found your post from just a couple of weeks ago. How would I create a container that ouputs embeddings instead of one of the default tasks? I achieve this by using the inference entrypoint script above - would I be able to do something similar?

From the docs

we recommend that the model in each container exhibits similar CPU utilization and latency on each inference request.

This seems like a red flag that I may have similar issues to inference endpoint with different processing times per container. The above seems to relate to just autoscaling, and if I set the policy to be on number of requests it should be fine?

If I could somehow pass an entrypoint script to the container to control the output I could then host the embedding container and classifier container on a single endpoint. Then I would call these via a lambda, that first gets the embeddigns from container 1, then the lambda would append some additional features before calling the classifier container (again, I am using entrypoint script here to control the output).
EDIT: reading your embedding blog post - would it just be a matter of packaging the inference.py script in the code folder for the model.tar.gz artifacts?

So in summary, for using hf embeddings for downstream classification tasks:

  • Inference pipeline - not a good use case (issues with processing the pipeline linearly)
  • Multi model - not a good use case, due to differences in model sizes (embedding model vs classifier), no gpu, and limitation of using same framework throughout
  • Multi container - could work if I can pass entrypoint script
  • Dedicated endpoints - should work but requires more endpoints and higher cost

Both multi container and dedicated endpoints would have the same process flow though - lambda that first makes a call for embeddings, attaches additional features and then makes a call for classification. It would just either go two to separate endpoints, or to two containers on the same endpoint. Does this sound about right?

Thanks again for the help!!

I’m not sure I follow this - In the documentation, invoking a lambda function asynchronosly means the invoke does not wait for a response, so how would that work for an inference endpoint? Or do you mean setting it up as an async endpoint? Thanks

Maybe I articulated myself not so well. I didn’t mean invoking the lambda in the asynchronous nor deploying the async endpoint.
I referred to running code inside the AWS lambda asynchronously (Example). But from your latest post, you explained that this would not be possible since you always need the response from the first endpoint.
Additionally, i am not sure how the SageMaker Pipeline behaves for the inner models. Lets assume that’s the pipeline:
Request → model 1 → model 2 → model 3 → response

The question is can model 1 serve predictions independently from the whole pipeline meaning when a pipeline request is not yet completed, can it already serve it requests from another pipeline request.

The question is can model 1 serve predictions independently from the whole pipeline meaning when a pipeline request is not yet completed, can it already serve it requests from another pipeline request.

OK I’m following now, thanks for clarifying.

I tried two solutions

  1. House all three containers on the same inference pipeline endpoint
  2. Using a lambda to invoke the endpoints, as well as do some light feature extraction using pandas

For 1, hosting them on the same endpoint made latency worse. I guess that was to be expected, but should confirm that the containers lock for the duration of the endpoint request.

2 performed much better in terms of latency. There are also 2 processors in my lambda so it should be possible to make concurrent request to two endpoints. In my case, the second endpoint needs the input from the first. But say I have an embedding model and some other feature processing endpoint I could call them concurrently and then feed this forward to the classifier.

  • I need to deploy with 3 ml.m5.xlarge instances to be able to handle a sustained request volume of 100-150 rps.
  • 2 ml.m5.xlarge can handle it for a few seconds.
  • Instance size does not seem to matter as long as it is big enough to load the model.

Since both endpoints are called sequentially inside the lambda, the whole pipeline locks for each request. Im thinking of ways to make it async, but I guess there is a tradeoff with having to pick up results from somewhere like s3. For now I have a working solution, but I’ll continue to look for ways to improve it, maybe by reducing the model size etc.

Another thing that will probably help is to start processing requests in small batches. I.e. if someone sends 1000 requests, that gets treated as a single request, vs 1000 separate. That should remove some of the concurrency issues.

Thanks again for your help!