We have requirement to apply several models on json document.
one of the model we are using is google/pegasus-xsum which is running really slow.
We have requirement of summarizing content from around 100 millions of record.
Size of the text to summarize varies between 100 characters to 3000 characters.
I am running code on g5.12xlarge AWS Ec2 machine and it takes around 1 second to generate output.
to run through 100millions documents it will take a while and we may need to run this again and again.
is there a way to make it even faster, is 1sec per document is the max i can get?
to convert this text, I have hosted python fast api rest endpoint on g5.12xlarge machine. i am providing json with text.
I also tried to use torch.nn.DataParallel but does not show much difference.
here is the code to run summarization
So according to the docs of Amazon EC2 G5 Instances | Amazon Web Services, it seems you have an instance with 4 GPUs available. In order to summarize content efficiently, you indeed can replicate your model on each of the 4 available GPU instances, so that each of the GPUs is processing a chunk of the data in parallel. Moreover, on each GPU, you can leverage batched generation to speed this up further.
This is my entire code looks like,
I am just starting with AI stuff and not much experience with it, even with python.
Please let me know how do i utilize multiple gpu with this code.
I am calling apply_all_models for each json document and i have over 100m document to process.
right now it is processing around 70k in a day.
any guidance is appreciated.
import json
import sys
import time
import traceback
import torch
sys.path.insert(0, 'modules')
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi.openapi.docs import get_swagger_ui_html
import uvicorn
from sentence_transformers import SentenceTransformer
import numpy as np
from transformers import pipeline
from transformers import PegasusForConditionalGeneration, PegasusTokenizer, AutoTokenizer, AutoConfig, \
AutoModelForSequenceClassification, AutoModelForTokenClassification
from scipy.special import softmax
app = FastAPI()
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from flair.data import Sentence
from flair.models import SequenceTagger
app = FastAPI()
# device = "mps"
device = "cuda"
device0 = "cuda:0"
device1 = "cuda:1"
device2 = "cuda:2"
device3 = "cuda:3"
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"], # Allows all methods (GET, POST, etc.)
allow_headers=["*"], # Allows all headers
)
uvicorn.config.LOGGING_CONFIG["loggers"]["uvicorn.access"]["level"] = "ERROR"
uvicorn.config.LOGGING_CONFIG["loggers"]["uvicorn.error"]["level"] = "ERROR"
uvicorn.config.LOGGING_CONFIG["loggers"]["uvicorn"]["level"] = "ERROR"
m2 = 'sentence-transformers/distiluse-base-multilingual-cased-v2'
model = SentenceTransformer(m2, device=device)
model = torch.nn.DataParallel(model).module
sentiment_model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
sentiment_tokenizer = AutoTokenizer.from_pretrained(sentiment_model_name)
sentiment_config = AutoConfig.from_pretrained(sentiment_model_name)
sentiment_model = AutoModelForSequenceClassification.from_pretrained(sentiment_model_name)
model_name = "google/pegasus-xsum"
tokenizer = PegasusTokenizer.from_pretrained(model_name)
pegasus_model = PegasusForConditionalGeneration.from_pretrained(model_name).to(device1)
label_tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-large-finetuned-conll03-english")
label_model = AutoModelForTokenClassification.from_pretrained("xlm-roberta-large-finetuned-conll03-english").to(device3)
classifier = pipeline("ner", model=label_model, tokenizer=label_tokenizer)
tagger = SequenceTagger.load("flair/ner-english-ontonotes-large")
class QueryRequest(BaseModel):
url: str
query: list[str]
class EmbeddingRequest(BaseModel):
sentences: list[str]
class TLDRRequest(BaseModel):
text: str
class Sentiment(BaseModel):
text: str
class Topics(BaseModel):
text: list[str]
class TokenRequest(BaseModel):
text: str
@app.get("/")
async def root():
return get_swagger_ui_html(
openapi_url="/openapi.json",
title="My API",
oauth2_redirect_url="https://example.com/oauth2/authorize",
)
def convert_string_to_vector(sentences):
start_time = time.time()
embeddings = model.encode(sentences)
print("--- convert_string_to_vector -%s--" % (time.time() - start_time))
return {"embeddings": embeddings.tolist()}
def convert_string_to_tldr(text):
start_time = time.time()
batch = tokenizer(text, truncation=True, padding="longest", return_tensors="pt").to(device1)
translated = pegasus_model.generate(**batch)
tgt_text = tokenizer.batch_decode(translated, skip_special_tokens=True)
print("--- convert_string_to_tldr -%s--" % (time.time() - start_time))
return {"tldr": tgt_text[0]}
def preprocess(text):
new_text = []
for t in text.split(" "):
t = '@user' if t.startswith('@') and len(t) > 1 else t
t = 'http' if t.startswith('http') else t
new_text.append(t)
return " ".join(new_text)
def get_sentiment(text):
start_time = time.time()
text = preprocess(text)
encoded_input = sentiment_tokenizer(text, return_tensors='pt')
output = sentiment_model(**encoded_input)
scores = output[0][0].detach().numpy()
scores = softmax(scores)
ranking = np.argsort(scores)
highest_score = 0
final_sentiment = 'neutral'
ranking = ranking[::-1]
for i in range(scores.shape[0]):
l = sentiment_config.id2label[ranking[i]]
s = scores[ranking[i]]
if s > highest_score:
highest_score = s
final_sentiment = l
# return {"sentiment": final_sentiment, "score": float(highest_score)}
print("--- get_sentiment - %s--" % (time.time() - start_time))
return final_sentiment
def process_classifier(text):
start_time = time.time()
label = classifier(text)
entity_list = []
index = 0
if len(label[0]) > 0:
for lab in label[0]:
word = str(lab['word'])
if word.startswith("▁"):
entity_list.append(word.replace("▁", ""))
index = index + 1
else:
size = len(entity_list)
if size == 0:
entity_list.append(word)
else:
entity_list[size - 1] = entity_list[size - 1] + word
print("--- process_classifier - %s " % (time.time() - start_time))
return list(set(entity_list))
else:
return None
def get_topics_v2(text):
start_time = time.time()
sentence = Sentence(text)
tagger.predict(sentence)
tokens = []
for entity in sentence.get_spans('ner'):
for token in entity.tokens:
tokens.append(token.text)
print("--- get_topics_v2 - %s " % (time.time() - start_time))
return tokens
@app.post("/apply_all_models")
async def apply_all_docs(json_string, thread_id):
start_time = time.time()
doc = json.loads(json_string)
print("started service for thread :" + thread_id + " and doc id " + str(doc['id']))
try:
print('id:' + doc['id'])
if 'ta' in doc:
combinedSentence = " ".join(doc['ta'])
doc['ta_ai'] = convert_string_to_vector([combinedSentence])['embeddings'][0]
# print(doc['ta_ai'])
if 'ap' in doc:
doc['ap_ai'] = convert_string_to_vector([doc['ap']])['embeddings'][0]
# print(doc['ap_ai'])
if 'su' in doc:
combinedSentence2 = " ".join(doc['su'])
doc['su_ai'] = convert_string_to_vector([combinedSentence2])['embeddings'][0]
# print(doc['su_ai'])
if 'ab' in doc:
ab_value = doc['ab']
combinedSentence = " ".join(ab_value)
start_time1 = time.time()
tldr = convert_string_to_tldr([combinedSentence])['tldr']
print("getting back tldr :" + str(time.time() - start_time1))
doc['ab_ai'] = tldr
doc['sentiment'] = get_sentiment(tldr)
doc['topic'] = process_classifier([ab_value])
doc['topic_v2'] = get_topics_v2(combinedSentence)
print("--- time for entire document - %s " % (time.time() - start_time))
except Exception as e:
print(doc['id'])
print("An unexpected error occurred:", e)
traceback.print_exc()
# print("finished service for thread :" + thread_id + " and doc id " + str(doc['id']) + " in time :" + str(
# time.time() - start_time))
return doc
if __name__ == "__main__":
uvicorn.run("HuggingFaceWebInterface_ec2:app", host="0.0.0.0", port=8001, workers=3)