Autopipeline to handle various scenarios

I’m working on enhancing our autopipeline to handle various scenarios. Here’s the current state of my code.

import inspect
import os
from collections import OrderedDict

import torch
from diffusers import (DiffusionPipeline,
from diffusers.configuration_utils import ConfigMixin
from diffusers.loaders import FromSingleFileMixin
from diffusers.pipelines import (StableDiffusionUpscalePipeline, StableDiffusionLatentUpscalePipeline)
from diffusers.pipelines.controlnet import (
from diffusers.pipelines.deepfloyd_if import IFImg2ImgPipeline, IFInpaintingPipeline, IFPipeline
from diffusers.pipelines.kandinsky import (
from diffusers.pipelines.kandinsky2_2 import (
from diffusers.pipelines.stable_diffusion import (
from diffusers.pipelines.stable_diffusion_xl import (

from nukesd.utils.logger import logger

        ("stable-diffusion", StableDiffusionPipeline),
        ("stable-diffusion-xl", StableDiffusionXLPipeline),
        ("stable-diffusion-xl-refiner", StableDiffusionXLPipeline),
        ("if", IFPipeline),
        ("InstructPix2", StableDiffusionInstructPix2PixPipeline),
        ("kandinsky", KandinskyCombinedPipeline),
        ("kandinskyPrior", KandinskyPriorPipeline),
        ("kandinsky22Prior", KandinskyV22PriorPipeline),
        ("kandinsky22", KandinskyV22CombinedPipeline),
        ("stable-diffusion-controlnet", StableDiffusionControlNetPipeline),
        ("stable-diffusion-xl-controlnet", StableDiffusionXLControlNetPipeline),

        ("stable-diffusion", StableDiffusionImg2ImgPipeline),
        ("stable-diffusion-xl", StableDiffusionXLImg2ImgPipeline),
        ("stable-diffusion-xl-refiner", StableDiffusionXLImg2ImgPipeline),
        ("if", IFImg2ImgPipeline),
        ("InstructPix2", StableDiffusionInstructPix2PixPipeline),
        ("kandinsky", KandinskyImg2ImgCombinedPipeline),
        ("kandinsky22", KandinskyV22Img2ImgCombinedPipeline),
        ("stable-diffusion-controlnet", StableDiffusionControlNetImg2ImgPipeline),

        ("stable-diffusion", StableDiffusionInpaintPipeline),
        ("stable-diffusion-xl", StableDiffusionXLInpaintPipeline),
        ("stable-diffusion-xl-refiner", StableDiffusionXLInpaintPipeline),
        ("if", IFInpaintingPipeline),
        ("kandinsky", KandinskyInpaintCombinedPipeline),
        ("kandinsky22", KandinskyV22InpaintCombinedPipeline),
        ("stable-diffusion-controlnet", StableDiffusionControlNetInpaintPipeline),

        ("kandinsky", KandinskyPipeline),
        ("kandinsky22", KandinskyV22Pipeline),
        ("kandinsky", KandinskyImg2ImgPipeline),
        ("kandinsky22", KandinskyV22Img2ImgPipeline),
        ("kandinsky", KandinskyInpaintPipeline),
        ("kandinsky22", KandinskyV22InpaintPipeline),
        ('upscaler', StableDiffusionUpscalePipeline),
        ('upscaler-latent', StableDiffusionLatentUpscalePipeline),

        ('paint-by-example', PaintByExamplePipeline),
        ('re-paint', RePaintPipeline),
        ('variation', StableDiffusionImageVariationPipeline)



def get_original_var_name(var):
    name = None
    for frame in inspect.stack()[2:]:
        frame_locals = frame[0].f_locals
        names = [name for name, val in frame_locals.items() if var is val]
        if names:
            name = names[0]
    return name

def _get_connected_pipeline(pipeline_cls):
    # for now connected pipelines can only be loaded from decoder pipelines, such as kandinsky-community/kandinsky-2-2-decoder
    if pipeline_cls in _AUTO_TEXT2IMAGE_DECODER_PIPELINES_MAPPING.values():
        return _get_task_class(
            AUTO_TEXT2IMAGE_PIPELINES_MAPPING, pipeline_cls.__name__, throw_error_if_not_exist=False
    if pipeline_cls in _AUTO_IMAGE2IMAGE_DECODER_PIPELINES_MAPPING.values():
        return _get_task_class(
            AUTO_IMAGE2IMAGE_PIPELINES_MAPPING, pipeline_cls.__name__, throw_error_if_not_exist=False
    if pipeline_cls in _AUTO_INPAINT_DECODER_PIPELINES_MAPPING.values():
        return _get_task_class(AUTO_INPAINT_PIPELINES_MAPPING, pipeline_cls.__name__, throw_error_if_not_exist=False)

def _get_task_class(mapping, pipeline_class_name, throw_error_if_not_exist: bool = True):
    def get_model(pipeline_class_name):
        for task_mapping in SUPPORTED_TASKS_MAPPINGS:
            for model_name, pipeline in task_mapping.items():
                if pipeline.__name__ == pipeline_class_name:
                    return model_name, task_mapping
        return None, None

    model_name, found_mapping = get_model(pipeline_class_name)

    if model_name is not None:
        task_class = mapping.get(model_name, None)
        if task_class is None:
            task_class = found_mapping.get(model_name, None)
            logger.warning(f"AutoPipeline couldn't find {pipeline_class_name} for {model_name}"
                           f" in :{get_original_var_name(mapping)} instead found in {get_original_var_name(found_mapping)}")
        return task_class

    if throw_error_if_not_exist:
        raise ValueError(f"AutoPipeline can't find a pipeline linked to {pipeline_class_name} for {model_name}")

def _get_signature_keys(obj):
    parameters = inspect.signature(obj.__init__).parameters
    required_parameters = {k: v for k, v in parameters.items() if v.default == inspect._empty}
    optional_parameters = set({k for k, v in parameters.items() if v.default != inspect._empty})
    expected_modules = set(required_parameters.keys()) - {"self"}
    return expected_modules, optional_parameters

def get_base_class_type_from_checkpoint(checkpoint):
    key_name_v2_1 = "model.diffusion_model.input_blocks.2.1.transformer_blocks.0.attn2.to_k.weight"
    key_name_sd_xl_base = "conditioner.embedders.1.model.transformer.resblocks.9.mlp.c_proj.bias"
    key_name_sd_xl_refiner = "conditioner.embedders.0.model.transformer.resblocks.9.mlp.c_proj.bias"

    # model_type = "v1"
    pipeline_name = "stable-diffusion"

    if key_name_v2_1 in checkpoint and checkpoint[key_name_v2_1].shape[-1] == 1024:
        # model_type = "v2"
        pipeline_name = "stable-diffusion"

    elif key_name_sd_xl_base in checkpoint:
        # only base xl has two text embedders
        pipeline_name = "stable-diffusion-xl"
    elif key_name_sd_xl_refiner in checkpoint:
        pipeline_name = "stable-diffusion-xl-refiner"
    return pipeline_name

def get_config_from_single_file(input_path, device=None):
    file_extension = input_path.rsplit(".", 1)[-1]
    from_safetensors = file_extension == "safetensors"

    if from_safetensors:
        from safetensors.torch import load_file as safe_load

        checkpoint = safe_load(input_path, device="cpu")
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"
            checkpoint = torch.load(input_path, map_location=device)
            checkpoint = torch.load(input_path, map_location=device)

    return checkpoint

def from_single_file_class_factory(base_cls):
    if issubclass(base_cls, FromSingleFileMixin):
        return base_cls
        return type(
            (base_cls, FromSingleFileMixin),

class AutoPipeline(ConfigMixin, FromSingleFileMixin):

    [`AutoPipelineForText2Image`] is a generic pipeline class that instantiates a text-to-image pipeline class. The
    specific underlying pipeline class is automatically selected from either the
    [`~AutoPipelineForText2Image.from_pretrained`] or [`~AutoPipelineForText2Image.from_pipe`] methods.

    This class cannot be instantiated using `__init__()` (throws an error).

    Class attributes:

        - **config_name** (`str`) -- The configuration filename that stores the class and module names of all the
          diffusion pipeline's components.

    config_name = "model_index.json"

    def __init__(self, *args, **kwargs):
        raise EnvironmentError(
            f"{self.__class__.__name__} is designed to be instantiated "
            f"using the `{self.__class__.__name__}.from_pretrained(pretrained_model_name_or_path)` or "
            f"`{self.__class__.__name__}.from_pipe(pipeline)` methods."

    def from_pretrained(cls, pretrained_model_or_path, out_type='txt2img', **kwargs):
        Instantiates a text-to-image Pytorch diffusion pipeline from pretrained pipeline weight.

        The from_pretrained() method takes care of returning the correct pipeline class instance by:
            1. Detect the pipeline class of the pretrained_model_or_path based on the _class_name property of its
               config object
            2. Find the text-to-image pipeline linked to the pipeline class using pattern matching on pipeline class

        If a `controlnet` argument is passed, it will instantiate a [`StableDiffusionControlNetPipeline`] object.

        The pipeline is set in evaluation mode (`model.eval()`) by default.

        If you get the error message below, you need to finetune the weights for your downstream task:

        Some weights of UNet2DConditionModel were not initialized from the model checkpoint at runwayml/stable-diffusion-v1-5 and are newly initialized because the shapes did not match:
        - conv_in.weight: found shape torch.Size([320, 4, 3, 3]) in the checkpoint and torch.Size([320, 9, 3, 3]) in the model instantiated
        You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.

            pretrained_model_or_path (`str` or `os.PathLike`, *optional*):
                Can be either:

                    - A string, the *repo id* (for example `CompVis/ldm-text2im-large-256`) of a pretrained pipeline
                      hosted on the Hub.
                    - A path to a *directory* (for example `./my_pipeline_directory/`) containing pipeline weights
                      saved using
            out_type (`str`, *optional*):
                txt2img, img2img, inpaint
            torch_dtype (`str` or `torch.dtype`, *optional*):
                Override the default `torch.dtype` and load the model with another dtype. If "auto" is passed, the
                dtype is automatically derived from the model's weights.
            force_download (`bool`, *optional*, defaults to `False`):
                Whether or not to force the (re-)download of the model weights and configuration files, overriding the
                cached versions if they exist.
            cache_dir (`Union[str, os.PathLike]`, *optional*):
                Path to a directory where a downloaded pretrained model configuration is cached if the standard cache
                is not used.
            resume_download (`bool`, *optional*, defaults to `False`):
                Whether or not to resume downloading the model weights and configuration files. If set to `False`, any
                incompletely downloaded files are deleted.
            proxies (`Dict[str, str]`, *optional*):
                A dictionary of proxy servers to use by protocol or endpoint, for example, `{'http': '',
                'http://hostname': ''}`. The proxies are used on each request.
            output_loading_info(`bool`, *optional*, defaults to `False`):
                Whether or not to also return a dictionary containing missing keys, unexpected keys and error messages.
            local_files_only (`bool`, *optional*, defaults to `False`):
                Whether to only load local model weights and configuration files or not. If set to `True`, the model
                won't be downloaded from the Hub.
            use_auth_token (`str` or *bool*, *optional*):
                The token to use as HTTP bearer authorization for remote files. If `True`, the token generated from
                `diffusers-cli login` (stored in `~/.huggingface`) is used.
            revision (`str`, *optional*, defaults to `"main"`):
                The specific model version to use. It can be a branch name, a tag name, a commit id, or any identifier
                allowed by Git.
            custom_revision (`str`, *optional*, defaults to `"main"`):
                The specific model version to use. It can be a branch name, a tag name, or a commit id similar to
                `revision` when loading a custom pipeline from the Hub. It can be a 🤗 Diffusers version when loading a
                custom pipeline from GitHub, otherwise it defaults to `"main"` when loading from the Hub.
            mirror (`str`, *optional*):
                Mirror source to resolve accessibility issues if you’re downloading a model in China. We do not
                guarantee the timeliness or safety of the source, and you should refer to the mirror site for more
            device_map (`str` or `Dict[str, Union[int, str, torch.device]]`, *optional*):
                A map that specifies where each submodule should go. It doesn’t need to be defined for each
                parameter/buffer name; once a given module name is inside, every submodule of it will be sent to the
                same device.

                Set `device_map="auto"` to have 🤗 Accelerate automatically compute the most optimized `device_map`. For
                more information about each option see [designing a device
            max_memory (`Dict`, *optional*):
                A dictionary device identifier for the maximum memory. Will default to the maximum memory available for
                each GPU and the available CPU RAM if unset.
            offload_folder (`str` or `os.PathLike`, *optional*):
                The path to offload weights if device_map contains the value `"disk"`.
            offload_state_dict (`bool`, *optional*):
                If `True`, temporarily offloads the CPU state dict to the hard drive to avoid running out of CPU RAM if
                the weight of the CPU state dict + the biggest shard of the checkpoint does not fit. Defaults to `True`
                when there is some disk offload.
            low_cpu_mem_usage (`bool`, *optional*, defaults to `True` if torch version >= 1.9.0 else `False`):
                Speed up model loading only loading the pretrained weights and not initializing the weights. This also
                tries to not use more than 1x model size in CPU memory (including peak memory) while loading the model.
                Only supported for PyTorch >= 1.9.0. If you are using an older version of PyTorch, setting this
                argument to `True` will raise an error.
            use_safetensors (`bool`, *optional*, defaults to `None`):
                If set to `None`, the safetensors weights are downloaded if they're available **and** if the
                safetensors library is installed. If set to `True`, the model is forcibly loaded from safetensors
                weights. If set to `False`, safetensors weights are not loaded.
            kwargs (remaining dictionary of keyword arguments, *optional*):
                Can be used to overwrite load and saveable variables (the pipeline components of the specific pipeline
                class). The overwritten components are passed directly to the pipelines `__init__` method. See example
                below for more information.
            variant (`str`, *optional*):
                Load weights from a specified variant filename such as `"fp16"` or `"ema"`. This is ignored when
                loading `from_flax`.


        To use private or [gated]( models, log-in with
        `huggingface-cli login`.



        # >>> from diffusers import AutoPipelineForText2Image
        # >>> pipeline = AutoPipelineForText2Image.from_pretrained("runwayml/stable-diffusion-v1-5")
        # >>> image = pipeline(prompt).images[0]
        config = cls.load_config(pretrained_model_or_path)
        original_cls_name = config["_class_name"]

        # if 'upscale' in original_cls_name.lower():
        #     out_type = 'upscale'

        if "controlnet" in kwargs:
            original_cls_name = config["_class_name"].replace("Pipeline", "ControlNetPipeline")

        pipelines_mapping = PIPELINES_MAPPING.get(out_type, AUTO_TEXT2IMAGE_PIPELINES_MAPPING)
        pipeline_cls = _get_task_class(pipelines_mapping, original_cls_name)
        return pipeline_cls.from_pretrained(pretrained_model_or_path, **kwargs)

    def from_pipe(cls, pipeline, out_type='txt2img', **kwargs):
        Instantiates a text-to-image Pytorch diffusion pipeline from another instantiated diffusion pipeline class.

        The from_pipe() method takes care of returning the correct pipeline class instance by finding the text-to-image
        pipeline linked to the pipeline class using pattern matching on pipeline class name.

        All the modules the pipeline contains will be used to initialize the new pipeline without reallocating
        additional memoery.

        The pipeline is set in evaluation mode (`model.eval()`) by default.

            pipeline (`DiffusionPipeline`):
                an instantiated `DiffusionPipeline` object
            out_type (`str`):
                txt2img, img2img, inpaint

        # >>> from diffusers import AutoPipelineForText2Image, AutoPipelineForImage2Image
        # >>> pipe_i2i = AutoPipelineForImage2Image.from_pretrained(
        # ...     "runwayml/stable-diffusion-v1-5", requires_safety_checker=False
        # ... )
        # >>> pipe_t2i = AutoPipelineForText2Image.from_pipe(pipe_i2i)
        # >>> image = pipe_t2i(prompt).images[0]


        original_config = dict(pipeline.config)
        original_cls_name = pipeline.__class__.__name__

        # derive the pipeline class to instantiate
        pipelines_mapping = PIPELINES_MAPPING.get(out_type, AUTO_TEXT2IMAGE_PIPELINES_MAPPING)
        pipeline_cls = _get_task_class(pipelines_mapping, original_cls_name)

        # define expected module and optional kwargs given the pipeline signature
        expected_modules, optional_kwargs = _get_signature_keys(pipeline_cls)

        pretrained_model_name_or_path = original_config.pop("_name_or_path", None)

        # allow users pass modules in `kwargs` to override the original pipeline's components
        passed_class_obj = {k: kwargs.pop(k) for k in expected_modules if k in kwargs}
        original_class_obj = {
            k: pipeline.components[k]
            for k, v in pipeline.components.items()
            if k in expected_modules and k not in passed_class_obj

        # allow users pass optional kwargs to override the original pipelines config attribute
        passed_pipe_kwargs = {k: kwargs.pop(k) for k in optional_kwargs if k in kwargs}
        original_pipe_kwargs = {
            k: original_config[k]
            for k, v in original_config.items()
            if k in optional_kwargs and k not in passed_pipe_kwargs

        # config that were not expected by original pipeline is stored as private attribute
        # we will pass them as optional arguments if they can be accepted by the pipeline
        additional_pipe_kwargs = [
            for k in original_config.keys()
            if k.startswith("_") and k[1:] in optional_kwargs and k[1:] not in passed_pipe_kwargs
        for k in additional_pipe_kwargs:
            original_pipe_kwargs[k] = original_config.pop(f"_{k}")

        cls_kwargs = {**passed_class_obj, **original_class_obj, **passed_pipe_kwargs, **original_pipe_kwargs}

        # store unused config as private attribute
        unused_original_config = {
            f"{'' if k.startswith('_') else '_'}{k}": original_config[k]
            for k, v in original_config.items()
            if k not in cls_kwargs

        missing_modules = set(expected_modules) - set(pipeline._optional_components) - set(cls_kwargs.keys())

        if len(missing_modules) > 0:
            raise ValueError(
                f"Pipeline {pipeline_cls} expected {expected_modules}, but only {set(list(passed_class_obj.keys()) + list(original_class_obj.keys()))} were passed"

        model = pipeline_cls(**cls_kwargs)

        return model

    def from_single_file(cls, inp, **kwargs):

        orig_class_name = get_base_class_type_from_checkpoint(get_config_from_single_file(inp))
        pipe_cls = AUTO_TEXT2IMAGE_PIPELINES_MAPPING.get(orig_class_name)
        pipe_cls = from_single_file_class_factory(pipe_cls)
        return pipe_cls.from_single_file(inp, **kwargs)

    def get(cls, inp, out_type='txt2img', **kwargs):
        if isinstance(inp, ConfigMixin):
            pipe_func = cls.from_pipe
        elif os.path.isfile(inp):
            pipe_func = cls.from_single_file
        elif os.path.isdir(inp) or '/' in inp or '\\' in inp:
            pipe_func = cls.from_pretrained

            raise f"{inp} The path does not exist."

        pipe = pipe_func(inp, **kwargs)
        if out_type.lower() in ['img2img', 'inpaint']:
            return cls.from_pipe(pipe, out_type=out_type.lower(), **kwargs)
        return pipe