How to build a custom question-answering head?

Using the TFBertForQuestionAnswering.from_pretrained() function, we get a predefined head on top of BERT together with a loss function that are suitable for this task.

My question is how to create a custom head without relying on TFAutoModelForQuestionAnswering.from_pretrained().

I want to do this because there is no place where the architecture of the head is explained clearly. By reading the code here we can see the architecture they are using, but I can’t be sure I understand their code 100%.

Starting from machine learning - How to Fine-tune HuggingFace BERT model for Text Classification - Stack Overflow is good. However, it covers only the classification task, which is much simpler.

'start_positions' and 'end_positions' are created following this tutorial.

So far, I’ve got the following:

train_dataset
# Dataset({
#     features: ['input_ids', 'token_type_ids', 'attention_mask', 'start_positions', 'end_positions'],
#     num_rows: 99205
# })
train_dataset.set_format(type='tensorflow', columns=['input_ids', 'token_type_ids', 'attention_mask'])
features = {x: train_dataset[x] for x in ['input_ids', 'token_type_ids', 'attention_mask']}
labels = [train_dataset[x] for x in ['start_positions', 'end_positions']]
labels = np.array(labels).T
tfdataset = tf.data.Dataset.from_tensor_slices((features, labels)).batch(16)

input_ids = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='input_ids')
token_type_ids = tf.keras.layers.Input(shape=(256,), dtype=tf.int32, name='token_type_ids')
attention_mask = tf.keras.layers.Input((256,), dtype=tf.int32, name='attention_mask')


bert = TFAutoModel.from_pretrained("bert-base-multilingual-cased")
output = bert([input_ids, token_type_ids, attention_mask]).last_hidden_state
output = tf.keras.layers.Dense(2, name="qa_outputs")(output)
model = tf.keras.models.Model(inputs=[input_ids, token_type_ids, attention_mask], outputs=output)


num_train_epochs = 3
num_train_steps = len(tfdataset) * num_train_epochs
optimizer, schedule = create_optimizer(
   init_lr=2e-5,
   num_warmup_steps=0,
   num_train_steps=num_train_steps,
   weight_decay_rate=0.01
)

def qa_loss(labels, logits):
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(
        from_logits=True, reduction=tf.keras.losses.Reduction.NONE
    )
    start_loss = loss_fn(labels[0], logits[0])
    end_loss = loss_fn(labels[1], logits[1])
    return (start_loss + end_loss) / 2.0


model.compile(
    loss=loss_fn,
    optimizer=optimizer
)

model.fit(tfdataset, epochs=num_train_epochs)

And I am getting the following error:

ValueError: `labels.shape` must equal `logits.shape` except for the last dimension. Received: labels.shape=(2,) and logits.shape=(256, 2)

It is complaining about the shape of the labels. This should not happen since I am using SparseCategoricalCrossentropy loss.

1 Like

Hello :wave:

For :hugs: transformers models it’s best to use model’s built-in loss. For more native Keras implementation and explanation on why it should be that way, you can check out this tutorial. You can just call compile without a loss.

As a convenience, all :hugs: Transformers models come with a default loss which matches their output head, although you’re of course free to use your own. Because the built-in loss is computed internally during the forward pass, when using it you may find that some Keras metrics misbehave or give unexpected outputs.

1 Like

Thank you for your reply Merve.
I saw that article before and I understand that it is more convenient to use the built-in things like the loss and question answering head. However, I am trying to write my undergraduate thesis about this topic, so it is important to be clear about what is happening under the hood.

For future reference, I actually found a solution, which is just editing the TFBertForQuestionAnswering class itself!

For example, I added an additional layer in the following code and trained the model as usual and it worked :slight_smile:

from transformers import TFBertPreTrainedModel
from transformers import TFBertMainLayer
from transformers.modeling_tf_utils import TFQuestionAnsweringLoss, get_initializer, input_processing
from transformers.modeling_tf_outputs import TFQuestionAnsweringModelOutput 
from transformers import BertConfig

class MY_TFBertForQuestionAnswering(TFBertPreTrainedModel, TFQuestionAnsweringLoss):
    # names with a '.' represents the authorized unexpected/missing layers when a TF model is loaded from a PT model
    _keys_to_ignore_on_load_unexpected = [
        r"pooler",
        r"mlm___cls",
        r"nsp___cls",
        r"cls.predictions",
        r"cls.seq_relationship",
    ]

    def __init__(self, config: BertConfig, *inputs, **kwargs):
        super().__init__(config, *inputs, **kwargs)

        self.num_labels = config.num_labels

        self.bert = TFBertMainLayer(config, add_pooling_layer=False, name="bert")

        # This is the dense layer I added 
        self.my_dense = tf.keras.layers.Dense(
            units=config.hidden_size,
            kernel_initializer=get_initializer(config.initializer_range),
            name="my_dense",
        )
        self.qa_outputs = tf.keras.layers.Dense(
            units=config.num_labels,
            kernel_initializer=get_initializer(config.initializer_range),
            name="qa_outputs",
        )

    def call(
        self,
        input_ids = None,
        attention_mask = None,
        token_type_ids = None,
        position_ids = None,
        head_mask = None,
        inputs_embeds = None,
        output_attentions = None,
        output_hidden_states = None,
        return_dict = None,
        start_positions = None,
        end_positions= None,
        training = False,
        **kwargs,
    ):
        r"""
        start_positions (`tf.Tensor` or `np.ndarray` of shape `(batch_size,)`, *optional*):
            Labels for position (index) of the start of the labelled span for computing the token classification loss.
            Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
            are not taken into account for computing the loss.
        end_positions (`tf.Tensor` or `np.ndarray` of shape `(batch_size,)`, *optional*):
            Labels for position (index) of the end of the labelled span for computing the token classification loss.
            Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence
            are not taken into account for computing the loss.
        """
        inputs = input_processing(
            func=self.call,
            config=self.config,
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
            start_positions=start_positions,
            end_positions=end_positions,
            training=training,
            kwargs_call=kwargs,
        )
        outputs = self.bert(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            token_type_ids=inputs["token_type_ids"],
            position_ids=inputs["position_ids"],
            head_mask=inputs["head_mask"],
            inputs_embeds=inputs["inputs_embeds"],
            output_attentions=inputs["output_attentions"],
            output_hidden_states=inputs["output_hidden_states"],
            return_dict=inputs["return_dict"],
            training=inputs["training"],
        )
        sequence_output = outputs[0]

        # You also have to add it here
        my_logits = self.my_dense(inputs=sequence_output)
        logits = self.qa_outputs(inputs=my_logits)
        start_logits, end_logits = tf.split(value=logits, num_or_size_splits=2, axis=-1)
        start_logits = tf.squeeze(input=start_logits, axis=-1)
        end_logits = tf.squeeze(input=end_logits, axis=-1)
        loss = None

        if inputs["start_positions"] is not None and inputs["end_positions"] is not None:
            labels = {"start_position": inputs["start_positions"]}
            labels["end_position"] = inputs["end_positions"]
            loss = self.hf_compute_loss(labels=labels, logits=(start_logits, end_logits))

        if not inputs["return_dict"]:
            output = (start_logits, end_logits) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return TFQuestionAnsweringModelOutput(
            loss=loss,
            start_logits=start_logits,
            end_logits=end_logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

    def serving_output(self, output: TFQuestionAnsweringModelOutput) -> TFQuestionAnsweringModelOutput:
        hs = tf.convert_to_tensor(output.hidden_states) if self.config.output_hidden_states else None
        attns = tf.convert_to_tensor(output.attentions) if self.config.output_attentions else None

        return TFQuestionAnsweringModelOutput(
            start_logits=output.start_logits, end_logits=output.end_logits, hidden_states=hs, attentions=attns
        )