Batch_transform Pipeline?

Hello,

I fine tuned 2 BERT models which (1.) classify different customer reviews and tag them with different labels and (2.) detect the sentiment in each text. I process these texts in a weekly batch on AWS Sagemaker. Right now I am writing two different batch transform jobs which (1.) predict the class & (2.) predict the text. My question now if it is possible to integrate both models to one batchtransform job. My fine tuned models are in my S3 bucket in the tar.gz format and my code currently looks like this:

# package the inference scrip and pre-trained classifier model into .tar.gz format
!cd model_token && tar zcvf model.tar.gz * 
!mv model_token/model.tar.gz ./model.tar.gz

# upload pre-trained classifier model to s3 bucket
model_url = s3_path_join("s3://",sagemaker_session_bucket,"batch_transform/model")
print(f"Uploading Model to {model_url}")
model_uri = S3Uploader.upload('model.tar.gz',model_url)
print(f"Uploaded model to {model_uri}")

#same procedure for sentiment model
!cd sentiment_token && tar zcvf sentiment.tar.gz * 
!mv sentiment_token/sentiment.tar.gz ./sentiment.tar.gz
model_url = s3_path_join("s3://",sagemaker_session_bucket,"batch_transform/model")
print(f"Uploading Model to {model_url}")
sentiment_uri = S3Uploader.upload('sentiment.tar.gz',model_url)
print(f"Uploaded model to {sentiment_uri}")


from sagemaker.huggingface.model import HuggingFaceModel

# create Hugging Face Model Class for classifier
huggingface_model = HuggingFaceModel(
   model_data=model_uri, # configuration for loading model from Hub
   role=role, # iam role with permissions to create an Endpoint
   transformers_version="4.6", # transformers version used
   pytorch_version="1.7", # pytorch version used
   py_version='py36', # python version used
)

# create Transformer to run our batch job
batch_job = huggingface_model.transformer(
    instance_count=1,
    instance_type='ml.g4dn.xlarge',
    output_path=output_s3_path, # we are using the same s3 path to save the output with the input
    strategy='SingleRecord')

# starts batch transform job and uses s3 data as input
batch_job.transform(
    data=s3_file_uri,
    content_type='application/json',    
    split_type='Line')


#same for sentiment 
huggingface_model = HuggingFaceModel(
   model_data=sentiment_uri, # configuration for loading model from Hub
   role=role, # iam role with permissions to create an Endpoint
   transformers_version="4.6", # transformers version used
   pytorch_version="1.7", # pytorch version used
   py_version='py36', # python version used
)

# create Transformer to run our batch job
batch_job = huggingface_model.transformer(
    instance_count=1,
    instance_type='ml.g4dn.xlarge',
    output_path=output_s3_path, # we are using the same s3 path to save the output with the input
    strategy='SingleRecord')

# starts batch transform job and uses s3 data as input
batch_job.transform(
    data=s3_file_uri,
    content_type='application/json',    
    split_type='Line')

Thanks in advance!

Hey @marlon89,

Currently, does Batch transform doesn’t support multi-model endpoints. But what you could do is create a customer inference.py which contains two models and runs prediction against both.
Or you could use something like SageMaker Pipelines, AWS Lambda function to create an automated pipeline that takes care of it so you don’t need to run your batch transform jobs.

Or you could add wait=False to batch_job.transform which tells the SDK not to wait until the job completes and your jobs would run in “parallel”.

The probably easiest way to start and automate is to create an AWS Lambda function with a cloudwatch trigger (run every week on day X) and start your batch transform jobs with it.

1 Like

Hey Phil,

so I have written to predictors in one notebook to run the different predictions on the same text. So I have 2 HuggingFaceModels with 2 BatchTransformjobs in one notebook. The last issue I am facing here is that in each of those two batch jobs I have to define the output path:

batch_job = huggingface_model.transformer(
    instance_count=1,
    instance_type='ml.g4dn.xlarge',
    output_path=output_s3_path, 
    strategy='SingleRecord')

So I am getting two output file when I just want the predictions of both models in one. Is that somehow possible without a workaround?

Hey,

batch transform offers something called join_source, where you can join input and output files.

  • join_source ( str ) – The source of data to be joined to the transform output. It can be set to ‘Input’ meaning the entire input record will be joined to the inference result. You can use OutputFilter to select the useful portion before uploading to S3. (default: None). Valid values: Input, None.

But I am not sure if this works with the jsonl and json structure we need for HuggingFace. But you can find more about it here: Associate Prediction Results with Input Records - Amazon SageMaker

The easiest might be to write a custom python function, which post-process and merges your data files after the batch transform job is finished. If you use SageMaker Pipelines you could a lambda step for this.

1 Like

Seems not to work with .json files:

ClientError: An error occurred (ValidationException) when calling the CreateTransformJob operation: ContentType ("application/json") in TransformInput has to be the same as Accept ("null") in TransformOutput when JoinSource is "Input".

I think the error is saying something different.
Could you try adding the accept parameter to

# create Transformer to run our batch job
batch_job = huggingface_model.transformer(
    instance_count=1,
    instance_type='ml.g4dn.xlarge',
    output_path=output_s3_path, # we are using the same s3 path to save the output with the input
    accept="application/json",
    strategy='SingleRecord')

I hope I didn’t mess anything around. This is my code now:

# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
   model_data=model_uri, # configuration for loading model from Hub
   role=role, # iam role with permissions to create an Endpoint
   transformers_version="4.6", # transformers version used
   pytorch_version="1.7", # pytorch version used
   py_version='py36', # python version used
)

# create Transformer to run our batch job
batch_job = huggingface_model.transformer(
    instance_count=1,
    instance_type='ml.g4dn.xlarge',
    output_path=output_s3_path, # we are using the same s3 path to save the output with the input
    strategy='SingleRecord',accept="application/json")

# starts batch transform job and uses s3 data as input
batch_job.transform(
    data=s3_file_uri,
    content_type='application/json',    
    split_type='Line',join_source='Input')

And this is my output:

ClientError: An error occurred (ValidationException) when calling the CreateTransformJob operation: SplitType ("Line") in TransformInput has to be the same as AssembleWith ("null") in TransformOutput when JoinSource is "Input".

As the errors says you need to adjust AssembleWith to be the same.
=>

# create Transformer to run our batch job
batch_job = huggingface_model.transformer(
   instance_count=1,
   instance_type='ml.g4dn.xlarge',
   output_path=output_s3_path, # we are using the same s3 path to save the output with the input
   accept="application/json",
   assemble_with="Line",
   strategy='SingleRecord')

You can find the documentation here Transformer — sagemaker 2.198.0 documentation

  • assemble_with ( str ) – How the output is assembled (default: None). Valid values: ‘Line’ or ‘None’.
1 Like

Thank you for your support @philschmid! I am new to Sagemaker and somehow its not that easy for me. One more question: The HuggingFace models just accept json files on Sagemaker and no csv, correct?

Yes and No, So it accepts CSV files, but only if you provide the Header and the complete file than otherwise, you can determine the fields. Also, not all tasks are supported.
But for Batch Transform it is only JSON if you want to support another format you need to create a custom inference.py and overwrite the methods.

1 Like