How to pass pipeline parameters when using sagemaker DataSerializer?

Hi folks, we’re trying to deploy an ASR model to sagemaker, but getting hung up on how to pass pipeline parameters to the endpoint when using DataSerializer (as seems to be necessary).

For example, to deploy and call an ASR model (in this case HUBERT), we can do it as:

# create a serializer for the data
audio_serializer = DataSerializer(content_type='audio/x-audio') # using x-audio to support multiple audio formats

# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
	initial_instance_count=1, # number of instances
	instance_type='ml.m5.xlarge', # ec2 instance type
    serializer=audio_serializer, # serializer for our audio data.
)

res = predictor.predict(data = "sample1.flac")

print(res)

{'text': "GOING ALONG SLUSHY COUNTRY ROADS AND SPEAKING TO DAMP AUDIENCES IN DRAUGHTY SCHOOLROOMS DAY AFTER DAY FOR A FORTNIGHT HE'LL HAVE TO PUT IN AN APPEARANCE AT SOME PLACE OF WORSHIP ON SUNDAY MORNING AND HE CAN COME TO US IMMEDIATELY AFTERWARDS"}

So far so good, but in our case we’re dealing with longer audio files, and need to be able to pass the chunk_length_s and stride_length_s parameters.

If you’re not using a serializer, it appears to be as simple as e.g.

payload = {"inputs": input, "parameters": params}

response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType='application/json',
    Body=json.dumps(payload)
)

But because we’re using a serializer, we have to pass e.g. predictor.predict(data = "sample1.flac") and this does not seem to provide a way to include pipeline parameters like we need?

For example, if we try:

payload = {
    "inputs": "sample1.flac",
    "parameters": {
        "chunk_length_s" : 5
    }
}
res = predictor.predict(payload)

We receive error message ValueError: Object of type <class 'dict'> is not Data serializable. This makes sense since DataSerializer expects a file path or raw bytes.

So, how do we solve this? How do we correctly pass audio data into the sagemaker endpoint but also include pipeline parameters like chunk_length_s and stride_length_s so that the model operates correctly?

Perhaps @philschmid or @marshmellow77 would be willing to chime in?

Help! :slight_smile:

Given that SageMaker Hugging Face Inference Toolkit builds on top of the pipeline feature, I took a look at the pipeline documentation for ASR, and it seems to me that parameters like chunk_length_s and stride_length_s are specified when creating the pipeline, not at every inference request. I don’t have enough experience with ASR to say if that makes sense or not, but that’s what it looks like to me.

Now, how to fix your problem with that information? Again, I have very little experience with ASR workloads, but at the very least I would think you could create a custom inference script, create and use the ASR pipeline in that script and pass the parameters to the endpoint when creating it with the deploy() method via the env dictionary. Should be sth along the lines of

model.deploy(..., env={'chunk_length_s': 5, 'stride_length_s': 10}, ...)

and in the inference script:

def model_fn():
    pipe = pipeline("automatic-speech-recognition", chunk_length_s=os.environ('chunk_length_s'), ...

I haven’t tested this but maybe give it a try? Or, at the very least, I hope it sparks some other ideas how to go about this :slight_smile:

Cheers
Heiko

1 Like

Thanks for the quick reply @marshmellow77! and that’s an interesting point about being able to set those parameters at pipeline creation and not just at inference time (to my knowledge you can set them at each request as outlined here: Making automatic speech recognition work on large files with Wav2Vec2 in :hugs: Transformers). But I haven’t tried whether they can be set at pipeline creation as well, so I’ll be sure to test that.

If we are able to set ASR pipeline parameters upon model deployment is there a way to do that without a custom inference script. i.e. Is there a way to pass pipeline-specific parameters to sagemaker at model deployment? Would love to avoid custom inference script if possible for speed of iteration but we’ll definitely go that route if it’s the only option.

Something like (this is obviously wrong, just an example):

# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
	initial_instance_count=1,
	instance_type='ml.m5.xlarge',
	pipeline_kwargs={'chunk_length_s': 20, 'stride_length_s': 5}, # can we do something like this?
	serializer=audio_serializer,
)

e.g. Assuming the ASR pipeline can accept chunk_length_s and stride_length_s at model instantiation, is there a way to pass those to either the HuggingFaceModel object or in the model.deploy methods themselves?

As an aside: if these parameters actually cannot be set at pipeline creation, is there a way for us to pass them somehow in predictor.predict or would we have to use a custom inference script at that point? Something like (again, this is wrong just an example):

res = predictor.predict(data = "sample1.flac", pipe_kwargs={'chunk_length_s': 20, 'stride_length_s': 5})

Yes, you’re right, you can also provide those parameters at inference request. I should have looked into the code first before blurting out my ignorance :laughing: Shows you how little I know about ASR :see_no_evil: But it should also be possible to set them at pipeline creation according to this.

Looking at the code of the SM Python SDK and the SM Huggingface Inference Toolkit (SMHFIT), it seems to me that indeed this is a use case that is not covered when calling the predict() method:

Not sure what the best way forward is here, because I believe this means that even a custom inference script would not help here, because the exception is raised by the SM Python SDK, which is called before the SMHFIT is called … :thinking:

Hmm. Okay, so given that SMHFIT is built on top of HF pipelines, then in principle it should be possible to pass some kwargs somewhere at pipeline instantiation right?

It looks like the actual HF pipeline is instantiated inside transformers_utils.py here.

Do you happen to know how I would get some **kwargs down to that method? Presumably given that the ASR parameters appear to be able to be set at pipeline creation, if there’s a way for me to pass the stride_length_s and chunk_length_s params down through to that method above then my model would be properly set up for subsequent inference calls…

It looks like handler_service.py calls the get_pipeline method here but doesn’t look like it sends through any hyperparameters…

Can the load method be overridden? The docs only mention overriding model_fn, transform_fn, input_fn, predict_fn, and output_fn

Thanks again @marshmellow77!

model_fn() is the right place for overriding the loading of the model, see also this example.

I think the best way to pass on parameters to model instantiation is via os environment variables, as I tried to outline here.

One more asset that might be valuable when trying to set this up could be the SM SSH Helper library. It allows you to SSH into your endpoint and see what is going in there, run debugger, etc.

Just a quick update here @marshmellow77. I got this working using your original suggestion of instantiating the pipeline in model_fn using the chunk and stride args there. It’s not perfect, because it still doesn’t allow me to dynamically choose different chunk or stride lengths at inference time (even though the pipeline itself allows these args), but it’s a step in the right direction and working for now. I’ll report back if I figure out an elegant way to pass params at inference time, presumably by subclassing predict_fn (though I haven’t tried it yet).

Thanks again for your help. Really appreciate it.

1 Like

Thanks for the update. Based from what I have seen in the Python SDK code for the DataSerializer I wasn’t able to figure out a way to pass on parameters - it seems to me the code just doesn’t account for that.

If I have time this week or next I’m thinking of raising an issue and/or a PR for the same.

But if you do manage to figure out a way, please update this thread, I’d be very interested too :slight_smile:

Another thought - have you considered using the Whisper API instead of the HF API in the endpoint? It seems that the Whisper API can deal with audio files longer than 30 secs: GitHub - openai/whisper: Robust Speech Recognition via Large-Scale Weak Supervision

That way you wouldn’t have to provide any parameters at all :slight_smile:

ha, not a bad idea! Are you aware of any documentation on how to deploy Whisper API directly (to aws) rather than via HF API?

I mean, it’s your inference script, you can do whatever you want in it as long as the required libraries ar installed (via the requirements.txt file).

So, as long as you make sure Whisper library is installled you can just load the Whisper model and use that for inference. Sth like

import whisper

def model_fn(model_dir):
    model = whisper.load_model("large")
    return model

def predict_fn(data, model):
    result = model.transcribe(data)
    return result["text"]

should work.

I will try this out later today myself (because I’m also starting on a project that involves Whisper) and get back to you.

So, this worked like a charm for me:

import whisper

def model_fn(model_dir):
    model = whisper.load_model("large-v2")
    return model

def predict_fn(audio_bytes, model):
    audio_file = "tmp.mp3"
    
    # need to put the byte stream into a tmp file as model.transcribe() expects a file name
    with open(audio_file, "wb") as binary_file:
        binary_file.write(audio_bytes['inputs'])
        
    result = model.transcribe(audio_file)

    return {"detected_language": result["language"], "transcription": result["text"]}

This is my requirements.txt file:

transformers==4.25.1
git+https://github.com/openai/whisper.git
1 Like

Beautiful. Thanks! I’ll give it a shot.

how do you make the predictions then?

I get: TypeError: Object of type bytes is not JSON serializable.

Can you show me how you set de predictor and deploy it?

I tried tour custom predict_fn() and I get:

ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (500) from primary with message "[Errno 30] Read-only file system:

Do you know how I can solve it?