Custom Pipeline

I’m trying to create a custom pipeline

  1. preprocess (chunk tokenized text into chunks close to 2048 tokens)
  2. Feed the tokenized chunks into a HF model
  3. combine the (detokenized) output of the model to the untokenized text

I already have the chunking code, I’m just not sure how the pipeline is supposed to be coded. The documentation on HF isn’t very useful… Thanks!

class MyPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        self.input = kwargs.get('input', None)
        preprocess_kwargs = {}
        if "maybe_arg" in kwargs:
            preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
        return preprocess_kwargs, {}, {}    
    def __init__(self, path: str, max_len):
        self.max_len = max_len
        self.model2 = AutoModelForSeq2SeqLM.from_pretrained(path)
        self.tokenizer2 = AutoTokenizer.from_pretrained(path)         

    def preprocess(self, inputs, max_len):
        encoded_inputs = self.tokenizer2(inputs)
        idList = ChunkTokenizedIds(encoded_inputs['input_ids'], max_len)[0]
        return {"model_input": idList}

    def _forward(self, model_inputs):
        outputs = self.model(**model_inputs)
        # Maybe {"logits": Tensor(...)}
        return outputs

    def postprocess(self, model_outputs):
        output_text = model_outputs[0]['summary_text']
        combined_text = "[%s]\n%s" % (output_text, tokenizer.decode(self.inputs)) + "\n"
        return combined_text