"""
The ``mlflow.diviner`` module provides an API for logging, saving and loading ``diviner`` models.
Diviner wraps several popular open source time series forecasting libraries in a unified API that
permits training, back-testing cross validation, and forecasting inference for groups of related
series.
This module exports groups of univariate ``diviner`` models in the following formats:
Diviner format
Serialized instance of a ``diviner`` model type using native diviner serializers.
(e.g., "GroupedProphet" or "GroupedPmdarima")
:py:mod:`mlflow.pyfunc`
Produced for use by generic pyfunc-based deployment tools and for batch auditing
of historical forecasts.
.. _Diviner:
https://databricks-diviner.readthedocs.io/en/latest/index.html
"""
import logging
import os
import pathlib
import shutil
from typing import Any, Optional
import pandas as pd
import yaml
import mlflow
from mlflow import pyfunc
from mlflow.environment_variables import MLFLOW_DFS_TMP
from mlflow.exceptions import MlflowException
from mlflow.models import Model, ModelInputExample, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import _infer_signature_from_input_example
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.docstring_utils import LOG_MODEL_PARAM_DOCS, format_docstring
from mlflow.utils.environment import (
_CONDA_ENV_FILE_NAME,
_CONSTRAINTS_FILE_NAME,
_PYTHON_ENV_FILE_NAME,
_REQUIREMENTS_FILE_NAME,
_mlflow_conda_env,
_process_conda_env,
_process_pip_requirements,
_PythonEnv,
_validate_env_arguments,
)
from mlflow.utils.file_utils import (
get_total_file_size,
shutil_copytree_without_file_permissions,
write_to,
)
from mlflow.utils.model_utils import (
_add_code_from_conf_to_system_path,
_get_flavor_configuration,
_get_flavor_configuration_from_uri,
_validate_and_copy_code_paths,
_validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from mlflow.utils.uri import dbfs_hdfs_uri_to_fuse_path, generate_tmp_dfs_path
FLAVOR_NAME = "diviner"
_MODEL_BINARY_KEY = "data"
_MODEL_BINARY_FILE_NAME = "model.div"
_MODEL_TYPE_KEY = "model_type"
_FLAVOR_KEY = "flavors"
_SPARK_MODEL_INDICATOR = "fit_with_spark"
_logger = logging.getLogger(__name__)
[docs]def get_default_pip_requirements():
"""
Returns:
A list of default pip requirements for MLflow Models produced with the ``Diviner``
flavor. Calls to :py:func:`save_model()` and :py:func:`log_model()` produce a pip
environment that, at a minimum, contains these requirements.
"""
return [_get_pinned_requirement("diviner")]
[docs]def get_default_conda_env():
"""
Returns:
The default Conda environment for MLflow Models produced with the ``Diviner`` flavor
that is produced by calls to :py:func:`save_model()` and :py:func:`log_model()`.
"""
return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def save_model(
diviner_model,
path,
conda_env=None,
code_paths=None,
mlflow_model=None,
signature: ModelSignature = None,
input_example: ModelInputExample = None,
pip_requirements=None,
extra_pip_requirements=None,
metadata=None,
**kwargs,
):
"""Save a ``Diviner`` model object to a path on the local file system.
Args:
diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal
``DataFrame``.
path: Local path destination for the serialized model is to be saved.
conda_env: {{ conda_env }}
code_paths: {{ code_paths }}
mlflow_model: :py:mod:`mlflow.models.Model` the flavor that this model is being added to.
signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model
input and output :py:class:`Schema <mlflow.types.Schema>`. The model
signature can be :py:func:`inferred <mlflow.models.infer_signature>`
from datasets with valid model input (e.g. the training dataset with target
column omitted) and valid model output (e.g. model predictions generated on
the training dataset), for example:
.. code-block:: python
from mlflow.models import infer_signature
model = diviner.GroupedProphet().fit(data, ("region", "state"))
predictions = model.predict(prediction_config)
signature = infer_signature(data, predictions)
input_example: {{ input_example }}
pip_requirements: {{ pip_requirements }}
extra_pip_requirements: {{ extra_pip_requirements }}
metadata: {{ metadata }}
kwargs: Optional configurations for Spark DataFrame storage iff the model has
been fit in Spark.
Current supported options:
- `partition_by` for setting a (or several) partition columns as a list of \
column names. Must be a list of strings of grouping key column(s).
- `partition_count` for setting the number of part files to write from a \
repartition per `partition_by` group. The default part file count is 200.
- `dfs_tmpdir` for specifying the DFS temporary location where the model will \
be stored while copying from a local file system to a Spark-supported "dbfs:/" \
scheme.
"""
import diviner
_validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements)
path = pathlib.Path(path).absolute()
_validate_and_prepare_target_save_path(str(path))
# NB: When moving to native pathlib implementations, path encoding as string will not be needed.
code_dir_subpath = _validate_and_copy_code_paths(code_paths, str(path))
if mlflow_model is None:
mlflow_model = Model()
saved_example = _save_example(mlflow_model, input_example, str(path))
if signature is None and saved_example is not None:
wrapped_model = _DivinerModelWrapper(diviner_model)
signature = _infer_signature_from_input_example(saved_example, wrapped_model)
if signature is not None:
mlflow_model.signature = signature
if metadata is not None:
mlflow_model.metadata = metadata
fit_with_spark = _save_diviner_model(diviner_model, path, **kwargs)
flavor_conf = {_SPARK_MODEL_INDICATOR: fit_with_spark}
model_bin_kwargs = {_MODEL_BINARY_KEY: _MODEL_BINARY_FILE_NAME}
pyfunc.add_to_model(
mlflow_model,
loader_module="mlflow.diviner",
conda_env=_CONDA_ENV_FILE_NAME,
python_env=_PYTHON_ENV_FILE_NAME,
code=code_dir_subpath,
**model_bin_kwargs,
)
flavor_conf.update({_MODEL_TYPE_KEY: diviner_model.__class__.__name__}, **model_bin_kwargs)
mlflow_model.add_flavor(
FLAVOR_NAME, diviner_version=diviner.__version__, code=code_dir_subpath, **flavor_conf
)
if size := get_total_file_size(path):
mlflow_model.model_size_bytes = size
mlflow_model.save(str(path.joinpath(MLMODEL_FILE_NAME)))
if conda_env is None:
if pip_requirements is None:
default_reqs = get_default_pip_requirements()
inferred_reqs = mlflow.models.infer_pip_requirements(
str(path), FLAVOR_NAME, fallback=default_reqs
)
default_reqs = sorted(set(inferred_reqs).union(default_reqs))
else:
default_reqs = None
conda_env, pip_requirements, pip_constraints = _process_pip_requirements(
default_reqs, pip_requirements, extra_pip_requirements
)
else:
conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env)
with path.joinpath(_CONDA_ENV_FILE_NAME).open("w") as f:
yaml.safe_dump(conda_env, stream=f, default_flow_style=False)
if pip_constraints:
write_to(str(path.joinpath(_CONSTRAINTS_FILE_NAME)), "\n".join(pip_constraints))
write_to(str(path.joinpath(_REQUIREMENTS_FILE_NAME)), "\n".join(pip_requirements))
_PythonEnv.current().to_yaml(str(path.joinpath(_PYTHON_ENV_FILE_NAME)))
def _save_diviner_model(diviner_model, path, **kwargs) -> bool:
"""
Saves a Diviner model to the specified path. If the model was fit by using a Pandas DataFrame
for the training data submitted to `fit`, directly save the Diviner model object.
If the Diviner model was fit by using a Spark DataFrame, save the model components separately.
The metadata and ancillary files to write (JSON and Pandas DataFrames) are written directly
to a fuse mount location, which the Spark DataFrame that contains the individual serialized
Diviner model objects is written by using the 'dbfs:' scheme path that Spark recognizes.
"""
save_path = str(path.joinpath(_MODEL_BINARY_FILE_NAME))
if getattr(diviner_model, "_fit_with_spark", False):
# Validate that the path is a relative path early in order to fail fast prior to attempting
# to write the (large) DataFrame to a tmp DFS path first and raise a path validation
# Exception within MLflow when attempting to copy the temporary write files from DFS to
# the file system path provided.
if not os.path.isabs(path):
raise MlflowException(
"The save path provided must be a relative path. "
f"The path submitted, '{path}' is an absolute path."
)
# Create a temporary DFS location to write the Spark DataFrame containing the models to.
tmp_path = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get()))
# Save the model Spark DataFrame to the temporary DFS location
diviner_model._save_model_df_to_path(tmp_path, **kwargs)
diviner_data_path = os.path.abspath(save_path)
tmp_fuse_path = dbfs_hdfs_uri_to_fuse_path(tmp_path)
shutil.move(src=tmp_fuse_path, dst=diviner_data_path)
# Save the model metadata to the path location
diviner_model._save_model_metadata_components_to_path(path=diviner_data_path)
return True
diviner_model.save(save_path)
return False
def _load_model_fit_in_spark(local_model_path: str, flavor_conf, **kwargs):
"""
Loads a Diviner model that has been fit (and saved) in the Spark variant.
"""
# NB: To load the model DataFrame (which is a Spark DataFrame), Spark requires that the file
# partitions are in DFS. In order to facilitate this, the model DataFrame (saved as parquet)
# will be copied to a temporary DFS location. The remaining files can be read directly from
# the local file system path, which is handled within the Diviner APIs.
import diviner
dfs_temp_directory = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get()))
dfs_fuse_directory = dbfs_hdfs_uri_to_fuse_path(dfs_temp_directory)
os.makedirs(dfs_fuse_directory)
shutil_copytree_without_file_permissions(src_dir=local_model_path, dst_dir=dfs_fuse_directory)
diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY])
load_directory = os.path.join(dfs_fuse_directory, flavor_conf[_MODEL_BINARY_KEY])
return diviner_instance.load(load_directory)
[docs]def load_model(model_uri, dst_path=None, **kwargs):
"""Load a ``Diviner`` object from a local file or a run.
Args:
model_uri: The location, in URI format, of the MLflow model. For example:
- ``/Users/me/path/to/local/model``
- ``relative/path/to/local/model``
- ``s3://my_bucket/path/to/model``
- ``runs:/<mlflow_run_id>/run-relative/path/to/model``
- ``mlflow-artifacts:/path/to/model``
For more information about supported URI schemes, see
`Referencing Artifacts <https://www.mlflow.org/docs/latest/tracking.html#
artifact-locations>`_.
dst_path: The local filesystem path to which to download the model artifact.
This directory must already exist if provided. If unspecified, a local output
path will be created.
kwargs: Optional configuration options for loading of a Diviner model. For models
that have been fit and saved using Spark, if a specific DFS temporary directory
is desired for loading of Diviner models, use the keyword argument
`"dfs_tmpdir"` to define the loading temporary path for the model during loading.
Returns:
A ``Diviner`` model instance.
"""
model_uri = str(model_uri)
flavor_conf = _get_flavor_configuration_from_uri(model_uri, FLAVOR_NAME, _logger)
local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path)
_add_code_from_conf_to_system_path(local_model_path, flavor_conf)
if flavor_conf.get(_SPARK_MODEL_INDICATOR, False):
return _load_model_fit_in_spark(local_model_path, flavor_conf, **kwargs)
return _load_model(local_model_path, flavor_conf)
def _load_model(path, flavor_conf):
"""
Loads a Diviner model instance that was not fit using Spark from a file system location.
"""
import diviner
local_path = pathlib.Path(path)
diviner_model_path = local_path.joinpath(
flavor_conf.get(_MODEL_BINARY_KEY, _MODEL_BINARY_FILE_NAME)
)
diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY])
return diviner_instance.load(str(diviner_model_path))
def _load_pyfunc(path):
local_path = pathlib.Path(path)
# NB: reverting the dir walk that happens with pyfunc's loading implementation
if local_path.is_file():
local_path = local_path.parent
flavor_conf = _get_flavor_configuration(local_path, FLAVOR_NAME)
if flavor_conf.get(_SPARK_MODEL_INDICATOR):
raise MlflowException(
"The model being loaded was fit in Spark. Diviner models fit in "
"Spark do not support loading as pyfunc."
)
return _DivinerModelWrapper(_load_model(local_path, flavor_conf))
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def log_model(
diviner_model,
artifact_path,
conda_env=None,
code_paths=None,
registered_model_name=None,
signature: ModelSignature = None,
input_example: ModelInputExample = None,
await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS,
pip_requirements=None,
extra_pip_requirements=None,
metadata=None,
**kwargs,
):
"""Log a ``Diviner`` object as an MLflow artifact for the current run.
Args:
diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal ``DataFrame``.
artifact_path: Run-relative artifact path to save the model instance to.
conda_env: {{ conda_env }}
code_paths: {{ code_paths }}
registered_model_name: This argument may change or be removed in a
future release without warning. If given, create a model
version under ``registered_model_name``, also creating a
registered model if one with the given name does not exist.
signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model
input and output :py:class:`Schema <mlflow.types.Schema>`. The model
signature can be :py:func:`inferred <mlflow.models.infer_signature>`
from datasets with valid model input (e.g. the training dataset with target
column omitted) and valid model output (e.g. model predictions generated on
the training dataset), for example:
.. code-block:: python
:caption: Example
from mlflow.models import infer_signature
auto_arima_obj = AutoARIMA(out_of_sample_size=60, maxiter=100)
base_auto_arima = GroupedPmdarima(model_template=auto_arima_obj).fit(
df=training_data,
group_key_columns=("region", "state"),
y_col="y",
datetime_col="ds",
silence_warnings=True,
)
predictions = model.predict(n_periods=30, alpha=0.05, return_conf_int=True)
signature = infer_signature(data, predictions)
input_example: {{ input_example }}
await_registration_for: Number of seconds to wait for the model version
to finish being created and is in ``READY`` status.
By default, the function waits for five minutes.
Specify 0 or None to skip waiting.
pip_requirements: {{ pip_requirements }}
extra_pip_requirements: {{ extra_pip_requirements }}
metadata: {{ metadata }}
kwargs: Additional arguments for :py:class:`mlflow.models.model.Model`
Additionally, for models that have been fit in Spark, the following supported
configuration options are available to set.
Current supported options:
- `partition_by` for setting a (or several) partition columns as a list of \
column names. Must be a list of strings of grouping key column(s).
- `partition_count` for setting the number of part files to write from a \
repartition per `partition_by` group. The default part file count is 200.
- `dfs_tmpdir` for specifying the DFS temporary location where the model will \
be stored while copying from a local file system to a Spark-supported "dbfs:/" \
scheme.
Returns:
A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the
metadata of the logged model.
"""
return Model.log(
artifact_path=artifact_path,
flavor=mlflow.diviner,
registered_model_name=registered_model_name,
diviner_model=diviner_model,
conda_env=conda_env,
code_paths=code_paths,
signature=signature,
input_example=input_example,
await_registration_for=await_registration_for,
pip_requirements=pip_requirements,
extra_pip_requirements=extra_pip_requirements,
metadata=metadata,
**kwargs,
)
class _DivinerModelWrapper:
def __init__(self, diviner_model):
self.diviner_model = diviner_model
def get_raw_model(self):
"""
Returns the underlying model.
"""
return self.diviner_model
def predict(self, dataframe, params: Optional[dict[str, Any]] = None) -> pd.DataFrame:
"""A method that allows a pyfunc implementation of this flavor to generate forecasted values
from the end of a trained Diviner model's training series per group.
The implementation here encapsulates a config-based switch of method calling. In short:
* If the ``DataFrame`` supplied to this method contains a column ``groups`` whose
first row of data is of type List[tuple[str]] (containing the series-identifying
group keys that were generated to identify a single underlying model during training),
the caller will resolve to the method ``predict_groups()`` in each of the underlying
wrapped libraries (i.e., ``GroupedProphet.predict_groups()``).
* If the ``DataFrame`` supplied does not contain the column name ``groups``, then the
specific forecasting method that is primitive-driven (for ``GroupedProphet``, the
``predict()`` method mirrors that of ``Prophet``'s, requiring a ``DataFrame``
submitted with explicit datetime values per group which is not a tenable
implementation for pyfunc or RESTful serving) is utilized. For ``GroupedProphet``,
this is the ``.forecast()`` method, while for ``GroupedPmdarima``, this is the
``.predict()`` method.
Args:
dataframe: A ``pandas.DataFrame`` that contains the required configuration for the
appropriate ``Diviner`` type.
For example, for ``GroupedProphet.forecast()``:
- horizon : int
- frequency: str
predict_conf = pd.DataFrame({"horizon": 30, "frequency": "D"}, index=[0])
forecast = pyfunc.load_pyfunc(model_uri=model_path).predict(predict_conf)
Will generate 30 days of forecasted values for each group that the model
was trained on.
params: Additional parameters to pass to the model for inference.
Returns:
A Pandas DataFrame containing the forecasted values for each group key that was
either trained or declared as a subset with a ``groups`` entry in the ``dataframe``
configuration argument.
"""
from diviner import GroupedPmdarima, GroupedProphet
schema = dataframe.columns.values.tolist()
conf = dataframe.to_dict(orient="index").get(0)
# required parameter extraction and validation
horizon = conf.get("horizon", None)
n_periods = conf.get("n_periods", None)
if n_periods and horizon and n_periods != horizon:
raise MlflowException(
"The provided prediction configuration contains both `n_periods` and `horizon` "
"with different values. Please provide only one of these integer values.",
error_code=INVALID_PARAMETER_VALUE,
)
else:
if not n_periods and horizon:
n_periods = horizon
if not n_periods:
raise MlflowException(
"The provided prediction configuration Pandas DataFrame does not contain either "
"the `n_periods` or `horizon` columns. At least one of these must be specified "
f"with a valid integer value. Configuration schema: {schema}.",
error_code=INVALID_PARAMETER_VALUE,
)
if not isinstance(n_periods, int):
raise MlflowException(
"The `n_periods` column contains invalid data. Supplied type must be an integer. "
f"Type supplied: {type(n_periods)}",
error_code=INVALID_PARAMETER_VALUE,
)
frequency = conf.get("frequency", None)
if isinstance(self.diviner_model, GroupedProphet) and not frequency:
raise MlflowException(
"Diviner's GroupedProphet model requires a `frequency` value to be submitted in "
"Pandas date_range format. The submitted configuration Pandas DataFrame does not "
f"contain a `frequency` column. Configuration schema: {schema}.",
error_code=INVALID_PARAMETER_VALUE,
)
predict_col = conf.get("predict_col", None)
predict_groups = conf.get("groups", None)
if predict_groups and not isinstance(predict_groups, list):
raise MlflowException(
"Specifying a group subset for prediction requires groups to be defined as a "
f"[List[(Tuple|List)[<group_keys>]]. Submitted group type: {type(predict_groups)}.",
error_code=INVALID_PARAMETER_VALUE,
)
# NB: json serialization of a tuple converts the tuple to a List. Diviner requires a
# List of Tuples to be input to the group_prediction API. This conversion is for utilizing
# the pyfunc flavor through the serving API.
if predict_groups and not isinstance(predict_groups[0], tuple):
predict_groups = [tuple(group) for group in predict_groups]
if isinstance(self.diviner_model, GroupedProphet):
# We're wrapping two different endpoints to Diviner here for the pyfunc implementation.
# Since we're limited by a single endpoint, we can address redirecting to the
# method ``predict_groups()`` which will allow for a subset of groups to be forecasted
# if the prediction configuration DataFrame contains a List[tuple[str]]] in the
# ``groups`` column. If this column is not present, all groups will be used to generate
# forecasts, utilizing the less computationally complex method ``forecast``.
if not predict_groups:
prediction_df = self.diviner_model.forecast(horizon=n_periods, frequency=frequency)
else:
group_kwargs = {k: v for k, v in conf.items() if k in {"predict_col", "on_error"}}
prediction_df = self.diviner_model.predict_groups(
groups=predict_groups, horizon=n_periods, frequency=frequency, **group_kwargs
)
if predict_col is not None:
prediction_df.rename(columns={"yhat": predict_col}, inplace=True)
elif isinstance(self.diviner_model, GroupedPmdarima):
# As above, we're redirecting the prediction request to one of two different methods
# for ``Diviner``'s pmdarima implementation. If the ``groups`` column is present with
# a list of tuples of keys to lookup, ``predict_groups()`` will be used. Otherwise,
# the standard ``predict()`` method will be called to generate forecasts for all groups
# that were trained on.
restricted_keys = {"n_periods", "horizon", "frequency", "groups"}
predict_conf = {k: v for k, v in conf.items() if k not in restricted_keys}
if not predict_groups:
prediction_df = self.diviner_model.predict(n_periods=n_periods, **predict_conf)
else:
prediction_df = self.diviner_model.predict_groups(
groups=predict_groups, n_periods=n_periods, **predict_conf
)
else:
raise MlflowException(
f"The Diviner model instance type '{type(self.diviner_model)}' is not supported "
f"in version {mlflow.__version__} of MLflow.",
error_code=INVALID_PARAMETER_VALUE,
)
return prediction_df