This is my code for FT BERT for credit card and Phone number identification. I have a dataset that has sentences and each word has its tag(o, PHN, CRD). I am not able to get the required results. The model is not accurate, it identifies even dob as credit card sometimes. and not identify phone number/Credit card if spaces are in them. I am very new to this, is there any way i can improve BERT model so that it can identify more accurately.
This is my code:
# !pip install transformers seqeval[gpu]
from transformers import pipeline
from seqeval.metrics import classification_report
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertConfig, BertForTokenClassification
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'
file = "/home/dev/bert_trans/specific_testing_v2_re.csv"
data = pd.read_csv(file, encoding='unicode_escape')
frequencies = data.Tag.value_counts()
tags = {}
for tag, count in zip(frequencies.index, frequencies):
if tag != "O":
if tag[0:5] not in tags.keys():
tags[tag[0:5]] = count
else:
tags[tag[0:5]] += count
continue
# pandas has a very handy "forward fill" function to fill missing values based on the last upper non-nan value
data = data.fillna(method='ffill')
# let's create a new column called "sentence" which groups the words by sentence
data['sentence'] = data[['Sentence #','Word','Tag']].groupby(['Sentence #'])['Word'].transform(lambda x: ' '.join(x))
# let's also create a new column called "word_labels" which groups the tags by sentence
data['word_labels'] = data[['Sentence #','Word','Tag']].groupby(['Sentence #'])['Tag'].transform(lambda x: ','.join(x))
data.head()
label2id = {k: v for v, k in enumerate(data.Tag.unique())}
id2label = {v: k for v, k in enumerate(data.Tag.unique())}
data = data[["sentence", "word_labels"]].drop_duplicates().reset_index(drop=True)
MAX_LEN = 128
TRAIN_BATCH_SIZE = 4
VALID_BATCH_SIZE = 2
EPOCHS = 1
LEARNING_RATE = 1e-05
MAX_GRAD_NORM = 10
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
def tokenize_and_preserve_labels(sentence, text_labels, tokenizer):
"""
Word piece tokenization makes it difficult to match word labels
back up with individual word pieces. This function tokenizes each
word one at a time so that it is easier to preserve the correct
label for each subword. It is, of course, a bit slower in processing
time, but it will help our model achieve higher accuracy.
"""
tokenized_sentence = []
labels = []
sentence = sentence.strip()
for word, label in zip(sentence.split(), text_labels.split(",")):
# Tokenize the word and count # of subwords the word is broken into
tokenized_word = tokenizer.tokenize(word)
n_subwords = len(tokenized_word)
# Add the tokenized word to the final tokenized word list
tokenized_sentence.extend(tokenized_word)
# Add the same label to the new list of labels `n_subwords` times
labels.extend([label] * n_subwords)
return tokenized_sentence, labels
class dataset(Dataset):
def __init__(self, dataframe, tokenizer, max_len):
self.len = len(dataframe)
self.data = dataframe
self.tokenizer = tokenizer
self.max_len = max_len
def __getitem__(self, index):
# step 1: tokenize (and adapt corresponding labels)
sentence = self.data.sentence[index]
word_labels = self.data.word_labels[index]
tokenized_sentence, labels = tokenize_and_preserve_labels(sentence, word_labels, self.tokenizer)
# step 2: add special tokens (and corresponding labels)
tokenized_sentence = ["[CLS]"] + tokenized_sentence + ["[SEP]"] # add special tokens
labels.insert(0, "o") # add outside label for [CLS] token
labels.insert(-1, "o") # add outside label for [SEP] token
# step 3: truncating/padding
maxlen = self.max_len
if (len(tokenized_sentence) > maxlen):
# truncate
tokenized_sentence = tokenized_sentence[:maxlen]
labels = labels[:maxlen]
else:
# pad
tokenized_sentence = tokenized_sentence + ['[PAD]' for _ in range(maxlen - len(tokenized_sentence))]
labels = labels + ["o" for _ in range(maxlen - len(labels))]
# step 4: obtain the attention mask
attn_mask = [1 if tok != '[PAD]' else 0 for tok in tokenized_sentence]
# step 5: convert tokens to input ids
ids = self.tokenizer.convert_tokens_to_ids(tokenized_sentence)
label_ids = [label2id[label] for label in labels]
# the following line is deprecated
# label_ids = [label if label != 0 else -100 for label in label_ids]
return {
'ids': torch.tensor(ids, dtype=torch.long),
'mask': torch.tensor(attn_mask, dtype=torch.long),
# 'token_type_ids': torch.tensor(token_ids, dtype=torch.long),
'targets': torch.tensor(label_ids, dtype=torch.long)
}
def __len__(self):
return self.len
train_size = 0.8
train_dataset = data.sample(frac=train_size, random_state=200)
test_dataset = data.drop(train_dataset.index).reset_index(drop=True)
train_dataset = train_dataset.reset_index(drop=True)
training_set = dataset(train_dataset, tokenizer, MAX_LEN)
testing_set = dataset(test_dataset, tokenizer, MAX_LEN)
# print the first 30 tokens and corresponding labels
for token, label in zip(tokenizer.convert_ids_to_tokens(training_set[0]["ids"][:30]),
training_set[0]["targets"][:30]):
print('{0:10} {1}'.format(token, id2label[label.item()]))
train_params = {'batch_size': TRAIN_BATCH_SIZE,
'shuffle': True,
'num_workers': 0
}
test_params = {'batch_size': VALID_BATCH_SIZE,
'shuffle': True,
'num_workers': 0
}
training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)
model = BertForTokenClassification.from_pretrained('bert-base-uncased',
num_labels=len(id2label),
id2label=id2label,
label2id=label2id)
model.to(device)
ids = training_set[0]["ids"].unsqueeze(0)
mask = training_set[0]["mask"].unsqueeze(0)
targets = training_set[0]["targets"].unsqueeze(0)
ids = ids.to(device)
mask = mask.to(device)
targets = targets.to(device)
outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
initial_loss = outputs[0]
tr_logits = outputs[1]
optimizer = torch.optim.Adam(params=model.parameters(), lr=LEARNING_RATE)
# Defining the training function on the 80% of the dataset for tuning the bert model
def train(epoch):
tr_loss, tr_accuracy = 0, 0
nb_tr_examples, nb_tr_steps = 0, 0
tr_preds, tr_labels = [], []
# put model in training mode
model.train()
for idx, batch in enumerate(training_loader):
ids = batch['ids'].to(device, dtype=torch.long)
mask = batch['mask'].to(device, dtype=torch.long)
targets = batch['targets'].to(device, dtype=torch.long)
outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
loss, tr_logits = outputs.loss, outputs.logits
tr_loss += loss.item()
nb_tr_steps += 1
nb_tr_examples += targets.size(0)
if idx % 100 == 0:
loss_step = tr_loss / nb_tr_steps
print(f"Training loss per 100 training steps: {loss_step}")
# compute training accuracy
flattened_targets = targets.view(-1) # shape (batch_size * seq_len,)
active_logits = tr_logits.view(-1, model.num_labels) # shape (batch_size * seq_len, num_labels)
flattened_predictions = torch.argmax(active_logits, axis=1) # shape (batch_size * seq_len,)
# now, use mask to determine where we should compare predictions with targets (includes [CLS] and [SEP] token predictions)
active_accuracy = mask.view(-1) == 1 # active accuracy is also of shape (batch_size * seq_len,)
targets = torch.masked_select(flattened_targets, active_accuracy)
predictions = torch.masked_select(flattened_predictions, active_accuracy)
tr_preds.extend(predictions)
tr_labels.extend(targets)
tmp_tr_accuracy = accuracy_score(targets.cpu().numpy(), predictions.cpu().numpy())
tr_accuracy += tmp_tr_accuracy
# gradient clipping
torch.nn.utils.clip_grad_norm_(
parameters=model.parameters(), max_norm=MAX_GRAD_NORM
)
# backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
epoch_loss = tr_loss / nb_tr_steps
tr_accuracy = tr_accuracy / nb_tr_steps
print(f"Training loss epoch: {epoch_loss}")
print(f"Training accuracy epoch: {tr_accuracy}")
for epoch in range(EPOCHS):
print(f"Training epoch: {epoch + 1}")
train(epoch)
def valid(model, testing_loader):
# put model in evaluation mode
model.eval()
eval_loss, eval_accuracy = 0, 0
nb_eval_examples, nb_eval_steps = 0, 0
eval_preds, eval_labels = [], []
with torch.no_grad():
for idx, batch in enumerate(testing_loader):
ids = batch['ids'].to(device, dtype=torch.long)
mask = batch['mask'].to(device, dtype=torch.long)
targets = batch['targets'].to(device, dtype=torch.long)
outputs = model(input_ids=ids, attention_mask=mask, labels=targets)
loss, eval_logits = outputs.loss, outputs.logits
eval_loss += loss.item()
nb_eval_steps += 1
nb_eval_examples += targets.size(0)
if idx % 100 == 0:
loss_step = eval_loss / nb_eval_steps
print(f"Validation loss per 100 evaluation steps: {loss_step}")
# compute evaluation accuracy
flattened_targets = targets.view(-1) # shape (batch_size * seq_len,)
active_logits = eval_logits.view(-1, model.num_labels) # shape (batch_size * seq_len, num_labels)
flattened_predictions = torch.argmax(active_logits, axis=1) # shape (batch_size * seq_len,)
# now, use mask to determine where we should compare predictions with targets (includes [CLS] and [SEP] token predictions)
active_accuracy = mask.view(-1) == 1 # active accuracy is also of shape (batch_size * seq_len,)
targets = torch.masked_select(flattened_targets, active_accuracy)
predictions = torch.masked_select(flattened_predictions, active_accuracy)
eval_labels.extend(targets)
eval_preds.extend(predictions)
tmp_eval_accuracy = accuracy_score(targets.cpu().numpy(), predictions.cpu().numpy())
eval_accuracy += tmp_eval_accuracy
# print(eval_labels)
# print(eval_preds)
labels = [id2label[id.item()] for id in eval_labels]
predictions = [id2label[id.item()] for id in eval_preds]
# print(labels)
# print(predictions)
eval_loss = eval_loss / nb_eval_steps
eval_accuracy = eval_accuracy / nb_eval_steps
print(f"Validation Loss: {eval_loss}")
print(f"Validation Accuracy: {eval_accuracy}")
return labels, predictions
labels, predictions = valid(model, testing_loader)
print(classification_report([labels], [predictions]))
sentence = "John got his new phone number which is +1(212)-313-5467."
inputs = tokenizer(sentence, padding='max_length', truncation=True, max_length=MAX_LEN, return_tensors="pt")
# move to gpu
ids = inputs["input_ids"].to(device)
mask = inputs["attention_mask"].to(device)
# forward pass
outputs = model(ids, mask)
logits = outputs[0]
active_logits = logits.view(-1, model.num_labels) # shape (batch_size * seq_len, num_labels)
flattened_predictions = torch.argmax(active_logits, axis=1) # shape (batch_size*seq_len,) - predictions at the token level
tokens = tokenizer.convert_ids_to_tokens(ids.squeeze().tolist())
token_predictions = [id2label[i] for i in flattened_predictions.cpu().numpy()]
wp_preds = list(zip(tokens, token_predictions)) # list of tuples. Each tuple = (wordpiece, prediction)
word_level_predictions = []
for pair in wp_preds:
if (pair[0].startswith(" ##")) or (pair[0] in ['[CLS]', '[SEP]', '[PAD]']):
# skip prediction
continue
else:
word_level_predictions.append(pair[1])
# we join tokens, if they are not special ones
str_rep = " ".join([t[0] for t in wp_preds if t[0] not in ['[CLS]', '[SEP]', '[PAD]']]).replace(" ##", "")
print(str_rep)
print(word_level_predictions)
pipe = pipeline(task="token-classification", model=model.to("cpu"), tokenizer=tokenizer, aggregation_strategy="simple")
pipe("my date of birth is 23-09-2023, Jakes's phone numbers are +1 (414)-123 6512, and +1 (662) 234 0010")
model.save_pretrained()
my dataset is in such a way:
Sentence: 1,Sure,o
,",",o
,``,o
,I,o
,went,o
,shopping,o
,yesterday,o
,",",o
,and,o
,at,o
,the,o
,checkout,o
,",",o
,I,o
,paid,o
,using,o
,my,o
,credit,o
,card,o
,:,o
,4755 8555 6999 2555,CRD
,'',o
,.,o
Would appreciate if someone helped me, to improve the model.