MLFlow and Optuna

Hello all,

I am trying to perform hyperparameter tuning using optuna as a backend and log all trials to MLFlow. So I am initializing all parameters for mlflow and I also use trainer.hyperparameter_search(). My code is the follwing:

os.environ["MLFLOW_EXPERIMENT_NAME"] = "trainer-mlflow-demo"
os.environ["MLFLOW_FLATTEN_PARAMS"] = "1"


training_args = TrainingArguments(         
    output_dir=SAVE_DIR,      
    per_device_train_batch_size=PER_DEVICE_TRAIN_BATCH,         
    per_device_eval_batch_size=PER_DEVICE_EVAL_BATCH,
    evaluation_strategy = "epoch",
    logging_strategy="epoch",
    save_strategy = "epoch")     # need save strategy order to log the trials in optuna

trainer = Trainer(
  model=None,
  args=training_args,
  train_dataset=tokenized_dset['train'],
  eval_dataset=tokenized_dset['validation'],
  model_init=model_init
)

best_trial = trainer.hyperparameter_search(
direction="minimize",
backend="optuna",
hp_space=optuna_hp_space,
n_trials=NUM_TRIALS
)

However, I face a problem when logging the parameters of each trial. I have observed that only the parameters of the 1st trial are logged and for the following trials, only the metrics are logged. There are other details missing in the following trials, such as the Run Name. In the image below, you can see the two trials I performed, with the one with full parameters being the 1st trial.

Has anyone seen this problem again? Is there a workaround to log all parameters in all trials?

Thank you in advance,
Petrina

1 Like

I am also struggling how to get each trial visible on Mlfow. Can someone please advise? Thanks!

Hello!
I have managed to record the trials and the different metrics by overwriting the MLflowCallback from transformers.integrations. In this way I have managed to give at every trial a different run name and also log all parameters.

Hope this helps a little.

1 Like

Ahhh good idea! Thanks! Do you mind sharing your code change snippet?

Hello! Yes sure, my custom MLFlowCallback is the following:

from transformers.integrations import MLflowCallback
import mlflow
from transformers.utils import generic, logging, ENV_VARS_TRUE_VALUES
import os
import json

logger = logging.get_logger(__name__)


class MLFlowCustomCallback(MLflowCallback):

    def __init__(self, hp_search=None, config_path=None) -> None:
        super().__init__()

        self._ml_flow = mlflow
        self.hp_search = hp_search
        self.trial_num = 0
        self.config_path = config_path

        self._MAX_PARAM_VAL_LENGTH = mlflow.utils.validation.MAX_PARAM_VAL_LENGTH
        self._MAX_PARAMS_TAGS_PER_BATCH = mlflow.utils.validation.MAX_PARAMS_TAGS_PER_BATCH

        self._initialized = False
        self._auto_end_run = False
        self._log_artifacts = False

        self._nested_run = None
        self._experiment_name = None
        self._flatten_params = None
        self._run_id = None

    def setup(self, args, state, model):
        """
        Setup the optional MLflow integration.

        Environment:
        - **HF_MLFLOW_LOG_ARTIFACTS** (`str`, *optional*):
            Whether to use MLflow `.log_artifact()` facility to log artifacts. This only makes sense if logging to a
            remote server, e.g. s3 or GCS. If set to `True` or *1*, will copy each saved checkpoint on each save in
            [`TrainingArguments`]'s `output_dir` to the local or remote artifact storage. Using it without a remote
            storage will just copy the files to your artifact location.
        - **MLFLOW_EXPERIMENT_NAME** (`str`, *optional*, defaults to `None`):
            Whether to use an MLflow experiment_name under which to launch the run. Default to `None` which will point
            to the `Default` experiment in MLflow. Otherwise, it is a case sensitive name of the experiment to be
            activated. If an experiment with this name does not exist, a new experiment with this name is created.
        - **MLFLOW_TAGS** (`str`, *optional*):
            A string dump of a dictionary of key/value pair to be added to the MLflow run as tags. Example:
            `os.environ['MLFLOW_TAGS']='{"release.candidate": "RC1", "release.version": "2.2.0"}'`.
        - **MLFLOW_NESTED_RUN** (`str`, *optional*):
            Whether to use MLflow nested runs. If set to `True` or *1*, will create a nested run inside the current
            run.
        - **MLFLOW_RUN_ID** (`str`, *optional*):
            Allow to reattach to an existing run which can be usefull when resuming training from a checkpoint. When
            `MLFLOW_RUN_ID` environment variable is set, `start_run` attempts to resume a run with the specified run ID
            and other parameters are ignored.
        - **MLFLOW_FLATTEN_PARAMS** (`str`, *optional*, defaults to `False`):
            Whether to flatten the parameters dictionary before logging.
        """
        self._log_artifacts = os.getenv("HF_MLFLOW_LOG_ARTIFACTS", "FALSE").upper() in ENV_VARS_TRUE_VALUES
        self._nested_run = os.getenv("MLFLOW_NESTED_RUN", "FALSE").upper() in ENV_VARS_TRUE_VALUES
        self._experiment_name = os.getenv("MLFLOW_EXPERIMENT_NAME", None)
        self._flatten_params = os.getenv("MLFLOW_FLATTEN_PARAMS", "FALSE").upper() in ENV_VARS_TRUE_VALUES
        self._run_id = os.getenv("MLFLOW_RUN_ID", None)
        logger.debug(
            f"MLflow experiment_name={self._experiment_name}, run_name={args.run_name}, nested={self._nested_run},"
            f" tags={self._nested_run}"
        )
        if state.is_world_process_zero:
            if self._ml_flow.active_run() is None or self._nested_run or self._run_id:
                if self._experiment_name:
                    # Use of set_experiment() ensure that Experiment is created if not exists
                    self._ml_flow.set_experiment(self._experiment_name)
                # condition in order to change the experiment name at every iteration - Note: at the first trial or
                # the first time that this callback is called there is already an active run - probably from the
                # original MLFlowCallback
                if self.hp_search == "optuna":
                    self._ml_flow.start_run(run_name=f"trial-{self.trial_num + 1}", nested=self._nested_run)
                    self.trial_num = self.trial_num + 1
                else:
                    self._ml_flow.start_run(run_name=args.run_name, nested=self._nested_run)

                logger.debug(f"MLflow run started with run_id={self._ml_flow.active_run().info.run_id}")
                self._auto_end_run = True
            combined_dict = args.to_dict()
            if hasattr(model, "config") and model.config is not None:
                model_config = model.config.to_dict()
                combined_dict = {**model_config, **combined_dict}
            combined_dict = generic.flatten_dict(combined_dict) if self._flatten_params else combined_dict
            # remove params that are too long for MLflow
            for name, value in list(combined_dict.items()):
                # internally, all values are converted to str in MLflow
                if len(str(value)) > self._MAX_PARAM_VAL_LENGTH:
                    logger.warning(
                        f'Trainer is attempting to log a value of "{value}" for key "{name}" as a parameter. MLflow\'s'
                        " log_param() only accepts values no longer than 250 characters so we dropped this attribute."
                        " You can use `MLFLOW_FLATTEN_PARAMS` environment variable to flatten the parameters and"
                        " avoid this message."
                    )
                    del combined_dict[name]
            # MLflow cannot log more than 100 values in one go, so we have to split it
            combined_dict_items = list(combined_dict.items())
            for i in range(0, len(combined_dict_items), self._MAX_PARAMS_TAGS_PER_BATCH):
                self._ml_flow.log_params(dict(combined_dict_items[i: i + self._MAX_PARAMS_TAGS_PER_BATCH]))
            mlflow_tags = os.getenv("MLFLOW_TAGS", None)
            if mlflow_tags:
                mlflow_tags = json.loads(mlflow_tags)
                self._ml_flow.set_tags(mlflow_tags)
            if self.config_path:
                self._ml_flow.log_artifact(self.config_path)
        self._initialized = True

    def on_train_begin(self, args, state, control, model=None, **kwargs):
        """Changed the original on_train_begin in order not to check for self._initialized when running optuna"""
        if self.hp_search == "optuna":
            self.setup(args, state, model)
        else:
            if self._initialized:
                self.setup(args, state, model)


Most of the code is the standard MLFlowCallback from Hugging face, I have just added some attributes like hp_search which in this case I change the run name at each trial to have it logged at mlflow as different run.

Hope this helps,
Petrina

2 Likes

Thanks, Petrina! Appreciate it :slight_smile:

1 Like