Source code for mlflow.data.evaluation_dataset

import json
import logging
import math
import struct
import sys

from packaging.version import Version

import mlflow
from mlflow.entities import RunTag
from mlflow.exceptions import MlflowException
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.utils import insecure_hash
from mlflow.utils.string_utils import generate_feature_name_if_not_string

try:
    # `numpy` and `pandas` are not required for `mlflow-skinny`.
    import numpy as np
    import pandas as pd
except ImportError:
    pass

_logger = logging.getLogger(__name__)


def _hash_uint64_ndarray_as_bytes(array):
    assert len(array.shape) == 1
    # see struct pack format string https://docs.python.org/3/library/struct.html#format-strings
    return struct.pack(f">{array.size}Q", *array)


def _is_empty_list_or_array(data):
    if isinstance(data, list):
        return len(data) == 0
    elif isinstance(data, np.ndarray):
        return data.size == 0
    return False


def _is_array_has_dict(nd_array):
    if _is_empty_list_or_array(nd_array):
        return False

    # It is less likely the array or list contains heterogeneous elements, so just checking the
    # first element to avoid performance overhead.
    elm = nd_array.item(0)
    if isinstance(elm, (list, np.ndarray)):
        return _is_array_has_dict(elm)
    elif isinstance(elm, dict):
        return True

    return False


def _hash_array_of_dict_as_bytes(data):
    # NB: If an array or list contains dictionary element, it can't be hashed with
    # pandas.util.hash_array. Hence we need to manually hash the elements here. This is
    # particularly for the LLM use case where the input can be a list of dictionary
    # (chat/completion payloads), so doesn't handle more complex case like nested lists.
    result = b""
    for elm in data:
        if isinstance(elm, (list, np.ndarray)):
            result += _hash_array_of_dict_as_bytes(elm)
        elif isinstance(elm, dict):
            result += _hash_dict_as_bytes(elm)
        else:
            result += _hash_data_as_bytes(elm)
    return result


def _hash_ndarray_as_bytes(nd_array):
    if not isinstance(nd_array, np.ndarray):
        nd_array = np.array(nd_array)

    if _is_array_has_dict(nd_array):
        return _hash_array_of_dict_as_bytes(nd_array)

    return _hash_uint64_ndarray_as_bytes(
        pd.util.hash_array(nd_array.flatten(order="C"))
    ) + _hash_uint64_ndarray_as_bytes(np.array(nd_array.shape, dtype="uint64"))


def _hash_data_as_bytes(data):
    try:
        if isinstance(data, (list, np.ndarray)):
            return _hash_ndarray_as_bytes(data)
        if isinstance(data, dict):
            return _hash_dict_as_bytes(data)
        if np.isscalar(data):
            return _hash_uint64_ndarray_as_bytes(pd.util.hash_array(np.array([data])))
    finally:
        return b""  # Skip unsupported types by returning an empty byte string


def _hash_dict_as_bytes(data_dict):
    result = _hash_ndarray_as_bytes(list(data_dict.keys()))
    try:
        result += _hash_ndarray_as_bytes(list(data_dict.values()))
    # If the values containing non-hashable objects, we will hash the values recursively.
    except Exception:
        for value in data_dict.values():
            result += _hash_data_as_bytes(value)
    return result


def _hash_array_like_obj_as_bytes(data):
    """
    Helper method to convert pandas dataframe/numpy array/list into bytes for
    MD5 calculation purpose.
    """
    if isinstance(data, pd.DataFrame):
        # add checking `'pyspark' in sys.modules` to avoid importing pyspark when user
        # run code not related to pyspark.
        if "pyspark" in sys.modules:
            from pyspark.ml.linalg import Vector as spark_vector_type
        else:
            spark_vector_type = None

        def _hash_array_like_element_as_bytes(v):
            if spark_vector_type is not None:
                if isinstance(v, spark_vector_type):
                    return _hash_ndarray_as_bytes(v.toArray())
            if isinstance(v, (dict, list, np.ndarray)):
                return _hash_data_as_bytes(v)

            try:
                # Attempt to hash the value, if it fails, return an empty byte string
                pd.util.hash_array(np.array([v]))
                return v
            except TypeError:
                return b""  # Skip unhashable types by returning an empty byte string

        if Version(pd.__version__) >= Version("2.1.0"):
            data = data.map(_hash_array_like_element_as_bytes)
        else:
            data = data.applymap(_hash_array_like_element_as_bytes)
        return _hash_uint64_ndarray_as_bytes(pd.util.hash_pandas_object(data))
    elif isinstance(data, np.ndarray) and len(data) > 0 and isinstance(data[0], list):
        # convert numpy array of lists into numpy array of the string representation of the lists
        # because lists are not hashable
        hashable = np.array(str(val) for val in data)
        return _hash_ndarray_as_bytes(hashable)
    elif isinstance(data, np.ndarray) and len(data) > 0 and isinstance(data[0], np.ndarray):
        # convert numpy array of numpy arrays into 2d numpy arrays
        # because numpy array of numpy arrays are not hashable
        hashable = np.array(data.tolist())
        return _hash_ndarray_as_bytes(hashable)
    elif isinstance(data, np.ndarray):
        return _hash_ndarray_as_bytes(data)
    elif isinstance(data, list):
        return _hash_ndarray_as_bytes(np.array(data))
    else:
        raise ValueError("Unsupported data type.")


def _gen_md5_for_arraylike_obj(md5_gen, data):
    """
    Helper method to generate MD5 hash array-like object, the MD5 will calculate over:
     - array length
     - first NUM_SAMPLE_ROWS_FOR_HASH rows content
     - last NUM_SAMPLE_ROWS_FOR_HASH rows content
    """
    len_bytes = _hash_uint64_ndarray_as_bytes(np.array([len(data)], dtype="uint64"))
    md5_gen.update(len_bytes)
    if len(data) < EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH * 2:
        md5_gen.update(_hash_array_like_obj_as_bytes(data))
    else:
        if isinstance(data, pd.DataFrame):
            # Access rows of pandas Df with iloc
            head_rows = data.iloc[: EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH]
            tail_rows = data.iloc[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH :]
        else:
            head_rows = data[: EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH]
            tail_rows = data[-EvaluationDataset.NUM_SAMPLE_ROWS_FOR_HASH :]
        md5_gen.update(_hash_array_like_obj_as_bytes(head_rows))
        md5_gen.update(_hash_array_like_obj_as_bytes(tail_rows))


def convert_data_to_mlflow_dataset(data, targets=None, predictions=None):
    """Convert input data to mlflow dataset."""
    supported_dataframe_types = [pd.DataFrame]
    if "pyspark" in sys.modules:
        from pyspark.sql import DataFrame as SparkDataFrame

        supported_dataframe_types.append(SparkDataFrame)

    if predictions is not None:
        _validate_dataset_type_supports_predictions(
            data=data, supported_predictions_dataset_types=supported_dataframe_types
        )

    if isinstance(data, list):
        # If the list is flat, we assume each element is an independent sample.
        if not isinstance(data[0], (list, np.ndarray)):
            data = [[elm] for elm in data]

        return mlflow.data.from_numpy(
            np.array(data), targets=np.array(targets) if targets else None
        )
    elif isinstance(data, np.ndarray):
        return mlflow.data.from_numpy(data, targets=targets)
    elif isinstance(data, pd.DataFrame):
        return mlflow.data.from_pandas(df=data, targets=targets, predictions=predictions)
    elif "pyspark" in sys.modules and isinstance(data, SparkDataFrame):
        return mlflow.data.from_spark(df=data, targets=targets, predictions=predictions)
    else:
        # Cannot convert to mlflow dataset, return original data.
        _logger.info(
            "Cannot convert input data to `evaluate()` to an mlflow dataset, input must be a list, "
            f"a numpy array, a panda Dataframe or a spark Dataframe, but received {type(data)}."
        )
        return data


def _validate_dataset_type_supports_predictions(data, supported_predictions_dataset_types):
    """
    Validate that the dataset type supports a user-specified "predictions" column.
    """
    if not any(isinstance(data, sdt) for sdt in supported_predictions_dataset_types):
        raise MlflowException(
            message=(
                "If predictions is specified, data must be one of the following types, or an"
                " MLflow Dataset that represents one of the following types:"
                f" {supported_predictions_dataset_types}."
            ),
            error_code=INVALID_PARAMETER_VALUE,
        )


[docs]class EvaluationDataset: """ An input dataset for model evaluation. This is intended for use with the :py:func:`mlflow.models.evaluate()` API. """ NUM_SAMPLE_ROWS_FOR_HASH = 5 SPARK_DATAFRAME_LIMIT = 10000 def __init__( self, data, *, targets=None, name=None, path=None, feature_names=None, predictions=None, ): """ The values of the constructor arguments comes from the `evaluate` call. """ if name is not None and '"' in name: raise MlflowException( message=f'Dataset name cannot include a double quote (") but got {name}', error_code=INVALID_PARAMETER_VALUE, ) if path is not None and '"' in path: raise MlflowException( message=f'Dataset path cannot include a double quote (") but got {path}', error_code=INVALID_PARAMETER_VALUE, ) self._user_specified_name = name self._path = path self._hash = None self._supported_dataframe_types = (pd.DataFrame,) self._spark_df_type = None self._labels_data = None self._targets_name = None self._has_targets = False self._predictions_data = None self._predictions_name = None self._has_predictions = predictions is not None try: # add checking `'pyspark' in sys.modules` to avoid importing pyspark when user # run code not related to pyspark. if "pyspark" in sys.modules: from pyspark.sql import DataFrame as SparkDataFrame self._supported_dataframe_types = (pd.DataFrame, SparkDataFrame) self._spark_df_type = SparkDataFrame except ImportError: pass if feature_names is not None and len(set(feature_names)) < len(list(feature_names)): raise MlflowException( message="`feature_names` argument must be a list containing unique feature names.", error_code=INVALID_PARAMETER_VALUE, ) if self._has_predictions: _validate_dataset_type_supports_predictions( data=data, supported_predictions_dataset_types=self._supported_dataframe_types, ) has_targets = targets is not None if has_targets: self._has_targets = True if isinstance(data, (np.ndarray, list)): if has_targets and not isinstance(targets, (np.ndarray, list)): raise MlflowException( message="If data is a numpy array or list of evaluation features, " "`targets` argument must be a numpy array or list of evaluation labels.", error_code=INVALID_PARAMETER_VALUE, ) shape_message = ( "If the `data` argument is a numpy array, it must be a 2-dimensional " "array, with the second dimension representing the number of features. If the " "`data` argument is a list, each of its elements must be a feature array of " "the numpy array or list, and all elements must have the same length." ) if isinstance(data, list): try: data = np.array(data) except ValueError as e: raise MlflowException( message=shape_message, error_code=INVALID_PARAMETER_VALUE ) from e if len(data.shape) != 2: raise MlflowException( message=shape_message, error_code=INVALID_PARAMETER_VALUE, ) self._features_data = data if has_targets: self._labels_data = ( targets if isinstance(targets, np.ndarray) else np.array(targets) ) if len(self._features_data) != len(self._labels_data): raise MlflowException( message="The input features example rows must be the same length " "with labels array.", error_code=INVALID_PARAMETER_VALUE, ) num_features = data.shape[1] if feature_names is not None: feature_names = list(feature_names) if num_features != len(feature_names): raise MlflowException( message="feature name list must be the same length with feature data.", error_code=INVALID_PARAMETER_VALUE, ) self._feature_names = feature_names else: self._feature_names = [ f"feature_{str(i + 1).zfill(math.ceil(math.log10(num_features + 1)))}" for i in range(num_features) ] elif isinstance(data, self._supported_dataframe_types): if has_targets and not isinstance(targets, str): raise MlflowException( message="If data is a Pandas DataFrame or Spark DataFrame, `targets` argument " "must be the name of the column which contains evaluation labels in the `data` " "dataframe.", error_code=INVALID_PARAMETER_VALUE, ) if self._spark_df_type and isinstance(data, self._spark_df_type): if data.count() > EvaluationDataset.SPARK_DATAFRAME_LIMIT: _logger.warning( "Specified Spark DataFrame is too large for model evaluation. Only " f"the first {EvaluationDataset.SPARK_DATAFRAME_LIMIT} rows will be used. " "If you want evaluate on the whole spark dataframe, please manually call " "`spark_dataframe.toPandas()`." ) data = data.limit(EvaluationDataset.SPARK_DATAFRAME_LIMIT).toPandas() if has_targets: self._labels_data = data[targets].to_numpy() self._targets_name = targets if self._has_predictions: self._predictions_data = data[predictions].to_numpy() self._predictions_name = predictions if feature_names is not None: self._features_data = data[list(feature_names)] self._feature_names = feature_names else: features_data = data if has_targets: features_data = features_data.drop(targets, axis=1, inplace=False) if self._has_predictions: features_data = features_data.drop(predictions, axis=1, inplace=False) self._features_data = features_data self._feature_names = [ generate_feature_name_if_not_string(c) for c in self._features_data.columns ] else: raise MlflowException( message="The data argument must be a numpy array, a list or a Pandas DataFrame, or " "spark DataFrame if pyspark package installed.", error_code=INVALID_PARAMETER_VALUE, ) # generate dataset hash md5_gen = insecure_hash.md5() _gen_md5_for_arraylike_obj(md5_gen, self._features_data) if self._labels_data is not None: _gen_md5_for_arraylike_obj(md5_gen, self._labels_data) if self._predictions_data is not None: _gen_md5_for_arraylike_obj(md5_gen, self._predictions_data) md5_gen.update(",".join(list(map(str, self._feature_names))).encode("UTF-8")) self._hash = md5_gen.hexdigest() @property def feature_names(self): return self._feature_names @property def features_data(self): """ return features data as a numpy array or a pandas DataFrame. """ return self._features_data @property def labels_data(self): """ return labels data as a numpy array """ return self._labels_data @property def has_targets(self): """ Returns True if the dataset has targets, False otherwise. """ return self._has_targets @property def targets_name(self): """ return targets name """ return self._targets_name @property def predictions_data(self): """ return labels data as a numpy array """ return self._predictions_data @property def has_predictions(self): """ Returns True if the dataset has targets, False otherwise. """ return self._has_predictions @property def predictions_name(self): """ return predictions name """ return self._predictions_name @property def name(self): """ Dataset name, which is specified dataset name or the dataset hash if user don't specify name. """ return self._user_specified_name if self._user_specified_name is not None else self.hash @property def path(self): """ Dataset path """ return self._path @property def hash(self): """ Dataset hash, includes hash on first 20 rows and last 20 rows. """ return self._hash @property def _metadata(self): """ Return dataset metadata containing name, hash, and optional path. """ metadata = { "name": self.name, "hash": self.hash, } if self.path is not None: metadata["path"] = self.path return metadata def _log_dataset_tag(self, client, run_id, model_uuid): """ Log dataset metadata as a tag "mlflow.datasets", if the tag already exists, it will append current dataset metadata into existing tag content. """ existing_dataset_metadata_str = client.get_run(run_id).data.tags.get( "mlflow.datasets", "[]" ) dataset_metadata_list = json.loads(existing_dataset_metadata_str) for metadata in dataset_metadata_list: if ( metadata["hash"] == self.hash and metadata["name"] == self.name and metadata["model"] == model_uuid ): break else: dataset_metadata_list.append({**self._metadata, "model": model_uuid}) dataset_metadata_str = json.dumps(dataset_metadata_list, separators=(",", ":")) client.log_batch( run_id, tags=[RunTag("mlflow.datasets", dataset_metadata_str)], ) def __hash__(self): return hash(self.hash) def __eq__(self, other): if not isinstance(other, EvaluationDataset): return False if isinstance(self._features_data, np.ndarray): is_features_data_equal = np.array_equal(self._features_data, other._features_data) else: is_features_data_equal = self._features_data.equals(other._features_data) return ( is_features_data_equal and np.array_equal(self._labels_data, other._labels_data) and self.name == other.name and self.path == other.path and self._feature_names == other._feature_names )