Hello! Yes sure, my custom MLFlowCallback is the following:
from transformers.integrations import MLflowCallback
import mlflow
from transformers.utils import generic, logging, ENV_VARS_TRUE_VALUES
import os
import json
logger = logging.get_logger(__name__)
class MLFlowCustomCallback(MLflowCallback):
def __init__(self, hp_search=None, config_path=None) -> None:
super().__init__()
self._ml_flow = mlflow
self.hp_search = hp_search
self.trial_num = 0
self.config_path = config_path
self._MAX_PARAM_VAL_LENGTH = mlflow.utils.validation.MAX_PARAM_VAL_LENGTH
self._MAX_PARAMS_TAGS_PER_BATCH = mlflow.utils.validation.MAX_PARAMS_TAGS_PER_BATCH
self._initialized = False
self._auto_end_run = False
self._log_artifacts = False
self._nested_run = None
self._experiment_name = None
self._flatten_params = None
self._run_id = None
def setup(self, args, state, model):
"""
Setup the optional MLflow integration.
Environment:
- **HF_MLFLOW_LOG_ARTIFACTS** (`str`, *optional*):
Whether to use MLflow `.log_artifact()` facility to log artifacts. This only makes sense if logging to a
remote server, e.g. s3 or GCS. If set to `True` or *1*, will copy each saved checkpoint on each save in
[`TrainingArguments`]'s `output_dir` to the local or remote artifact storage. Using it without a remote
storage will just copy the files to your artifact location.
- **MLFLOW_EXPERIMENT_NAME** (`str`, *optional*, defaults to `None`):
Whether to use an MLflow experiment_name under which to launch the run. Default to `None` which will point
to the `Default` experiment in MLflow. Otherwise, it is a case sensitive name of the experiment to be
activated. If an experiment with this name does not exist, a new experiment with this name is created.
- **MLFLOW_TAGS** (`str`, *optional*):
A string dump of a dictionary of key/value pair to be added to the MLflow run as tags. Example:
`os.environ['MLFLOW_TAGS']='{"release.candidate": "RC1", "release.version": "2.2.0"}'`.
- **MLFLOW_NESTED_RUN** (`str`, *optional*):
Whether to use MLflow nested runs. If set to `True` or *1*, will create a nested run inside the current
run.
- **MLFLOW_RUN_ID** (`str`, *optional*):
Allow to reattach to an existing run which can be usefull when resuming training from a checkpoint. When
`MLFLOW_RUN_ID` environment variable is set, `start_run` attempts to resume a run with the specified run ID
and other parameters are ignored.
- **MLFLOW_FLATTEN_PARAMS** (`str`, *optional*, defaults to `False`):
Whether to flatten the parameters dictionary before logging.
"""
self._log_artifacts = os.getenv("HF_MLFLOW_LOG_ARTIFACTS", "FALSE").upper() in ENV_VARS_TRUE_VALUES
self._nested_run = os.getenv("MLFLOW_NESTED_RUN", "FALSE").upper() in ENV_VARS_TRUE_VALUES
self._experiment_name = os.getenv("MLFLOW_EXPERIMENT_NAME", None)
self._flatten_params = os.getenv("MLFLOW_FLATTEN_PARAMS", "FALSE").upper() in ENV_VARS_TRUE_VALUES
self._run_id = os.getenv("MLFLOW_RUN_ID", None)
logger.debug(
f"MLflow experiment_name={self._experiment_name}, run_name={args.run_name}, nested={self._nested_run},"
f" tags={self._nested_run}"
)
if state.is_world_process_zero:
if self._ml_flow.active_run() is None or self._nested_run or self._run_id:
if self._experiment_name:
# Use of set_experiment() ensure that Experiment is created if not exists
self._ml_flow.set_experiment(self._experiment_name)
# condition in order to change the experiment name at every iteration - Note: at the first trial or
# the first time that this callback is called there is already an active run - probably from the
# original MLFlowCallback
if self.hp_search == "optuna":
self._ml_flow.start_run(run_name=f"trial-{self.trial_num + 1}", nested=self._nested_run)
self.trial_num = self.trial_num + 1
else:
self._ml_flow.start_run(run_name=args.run_name, nested=self._nested_run)
logger.debug(f"MLflow run started with run_id={self._ml_flow.active_run().info.run_id}")
self._auto_end_run = True
combined_dict = args.to_dict()
if hasattr(model, "config") and model.config is not None:
model_config = model.config.to_dict()
combined_dict = {**model_config, **combined_dict}
combined_dict = generic.flatten_dict(combined_dict) if self._flatten_params else combined_dict
# remove params that are too long for MLflow
for name, value in list(combined_dict.items()):
# internally, all values are converted to str in MLflow
if len(str(value)) > self._MAX_PARAM_VAL_LENGTH:
logger.warning(
f'Trainer is attempting to log a value of "{value}" for key "{name}" as a parameter. MLflow\'s'
" log_param() only accepts values no longer than 250 characters so we dropped this attribute."
" You can use `MLFLOW_FLATTEN_PARAMS` environment variable to flatten the parameters and"
" avoid this message."
)
del combined_dict[name]
# MLflow cannot log more than 100 values in one go, so we have to split it
combined_dict_items = list(combined_dict.items())
for i in range(0, len(combined_dict_items), self._MAX_PARAMS_TAGS_PER_BATCH):
self._ml_flow.log_params(dict(combined_dict_items[i: i + self._MAX_PARAMS_TAGS_PER_BATCH]))
mlflow_tags = os.getenv("MLFLOW_TAGS", None)
if mlflow_tags:
mlflow_tags = json.loads(mlflow_tags)
self._ml_flow.set_tags(mlflow_tags)
if self.config_path:
self._ml_flow.log_artifact(self.config_path)
self._initialized = True
def on_train_begin(self, args, state, control, model=None, **kwargs):
"""Changed the original on_train_begin in order not to check for self._initialized when running optuna"""
if self.hp_search == "optuna":
self.setup(args, state, model)
else:
if self._initialized:
self.setup(args, state, model)
Most of the code is the standard MLFlowCallback from Hugging face, I have just added some attributes like hp_search which in this case I change the run name at each trial to have it logged at mlflow as different run.
Hope this helps,
Petrina