import json
import logging
from functools import cached_property
from typing import Any, Optional, Union
import pandas as pd
from mlflow.data.dataset import Dataset
from mlflow.data.dataset_source import DatasetSource
from mlflow.data.digest_utils import compute_pandas_digest
from mlflow.data.evaluation_dataset import EvaluationDataset
from mlflow.data.pyfunc_dataset_mixin import PyFuncConvertibleDatasetMixin, PyFuncInputsOutputs
from mlflow.exceptions import MlflowException
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.types import Schema
from mlflow.types.utils import _infer_schema
_logger = logging.getLogger(__name__)
[docs]class PandasDataset(Dataset, PyFuncConvertibleDatasetMixin):
"""
Represents a Pandas DataFrame for use with MLflow Tracking.
"""
def __init__(
self,
df: pd.DataFrame,
source: DatasetSource,
targets: Optional[str] = None,
name: Optional[str] = None,
digest: Optional[str] = None,
predictions: Optional[str] = None,
):
"""
Args:
df: A pandas DataFrame.
source: The source of the pandas DataFrame.
targets: The name of the target column. Optional.
name: The name of the dataset. E.g. "wiki_train". If unspecified, a name is
automatically generated.
digest: The digest (hash, fingerprint) of the dataset. If unspecified, a digest
is automatically computed.
predictions: Optional. The name of the column containing model predictions,
if the dataset contains model predictions. If specified, this column
must be present in the dataframe (``df``).
"""
if targets is not None and targets not in df.columns:
raise MlflowException(
f"The specified pandas DataFrame does not contain the specified targets column"
f" '{targets}'.",
INVALID_PARAMETER_VALUE,
)
if predictions is not None and predictions not in df.columns:
raise MlflowException(
f"The specified pandas DataFrame does not contain the specified predictions column"
f" '{predictions}'.",
INVALID_PARAMETER_VALUE,
)
self._df = df
self._targets = targets
self._predictions = predictions
super().__init__(source=source, name=name, digest=digest)
def _compute_digest(self) -> str:
"""
Computes a digest for the dataset. Called if the user doesn't supply
a digest when constructing the dataset.
"""
return compute_pandas_digest(self._df)
[docs] def to_dict(self) -> dict[str, str]:
"""Create config dictionary for the dataset.
Returns a string dictionary containing the following fields: name, digest, source, source
type, schema, and profile.
"""
schema = json.dumps({"mlflow_colspec": self.schema.to_dict()}) if self.schema else None
config = super().to_dict()
config.update(
{
"schema": schema,
"profile": json.dumps(self.profile),
}
)
return config
@property
def df(self) -> pd.DataFrame:
"""
The underlying pandas DataFrame.
"""
return self._df
@property
def source(self) -> DatasetSource:
"""
The source of the dataset.
"""
return self._source
@property
def targets(self) -> Optional[str]:
"""
The name of the target column. May be ``None`` if no target column is available.
"""
return self._targets
@property
def predictions(self) -> Optional[str]:
"""
The name of the predictions column. May be ``None`` if no predictions column is available.
"""
return self._predictions
@property
def profile(self) -> Optional[Any]:
"""
A profile of the dataset. May be ``None`` if a profile cannot be computed.
"""
return {
"num_rows": len(self._df),
"num_elements": int(self._df.size),
}
@cached_property
def schema(self) -> Optional[Schema]:
"""
An instance of :py:class:`mlflow.types.Schema` representing the tabular dataset. May be
``None`` if the schema cannot be inferred from the dataset.
"""
try:
return _infer_schema(self._df)
except Exception as e:
_logger.warning("Failed to infer schema for Pandas dataset. Exception: %s", e)
return None
def to_pyfunc(self) -> PyFuncInputsOutputs:
"""
Converts the dataset to a collection of pyfunc inputs and outputs for model
evaluation. Required for use with mlflow.evaluate().
"""
if self._targets:
inputs = self._df.drop(columns=[self._targets])
outputs = self._df[self._targets]
return PyFuncInputsOutputs(inputs, outputs)
else:
return PyFuncInputsOutputs(self._df)
def to_evaluation_dataset(self, path=None, feature_names=None) -> EvaluationDataset:
"""
Converts the dataset to an EvaluationDataset for model evaluation. Required
for use with mlflow.evaluate().
"""
return EvaluationDataset(
data=self._df,
targets=self._targets,
path=path,
feature_names=feature_names,
predictions=self._predictions,
)
[docs]def from_pandas(
df: pd.DataFrame,
source: Union[str, DatasetSource] = None,
targets: Optional[str] = None,
name: Optional[str] = None,
digest: Optional[str] = None,
predictions: Optional[str] = None,
) -> PandasDataset:
"""
Constructs a :py:class:`PandasDataset <mlflow.data.pandas_dataset.PandasDataset>` instance from
a Pandas DataFrame, optional targets, optional predictions, and source.
Args:
df: A Pandas DataFrame.
source: The source from which the DataFrame was derived, e.g. a filesystem
path, an S3 URI, an HTTPS URL, a delta table name with version, or
spark table etc. ``source`` may be specified as a URI, a path-like string,
or an instance of
:py:class:`DatasetSource <mlflow.data.dataset_source.DatasetSource>`.
If unspecified, the source is assumed to be the code location
(e.g. notebook cell, script, etc.) where
:py:func:`from_pandas <mlflow.data.from_pandas>` is being called.
targets: An optional target column name for supervised training. This column
must be present in the dataframe (``df``).
name: The name of the dataset. If unspecified, a name is generated.
digest: The dataset digest (hash). If unspecified, a digest is computed
automatically.
predictions: An optional predictions column name for model evaluation. This column
must be present in the dataframe (``df``).
.. code-block:: python
:test:
:caption: Example
import mlflow
import pandas as pd
x = pd.DataFrame(
[["tom", 10, 1, 1], ["nick", 15, 0, 1], ["july", 14, 1, 1]],
columns=["Name", "Age", "Label", "ModelOutput"],
)
dataset = mlflow.data.from_pandas(x, targets="Label", predictions="ModelOutput")
"""
from mlflow.data.code_dataset_source import CodeDatasetSource
from mlflow.data.dataset_source_registry import resolve_dataset_source
from mlflow.tracking.context import registry
if source is not None:
if isinstance(source, DatasetSource):
resolved_source = source
else:
resolved_source = resolve_dataset_source(
source,
)
else:
context_tags = registry.resolve_tags()
resolved_source = CodeDatasetSource(tags=context_tags)
return PandasDataset(
df=df,
source=resolved_source,
targets=targets,
name=name,
digest=digest,
predictions=predictions,
)