Source code for mlflow.diviner

"""
The ``mlflow.diviner`` module provides an API for logging, saving and loading ``diviner`` models.
Diviner wraps several popular open source time series forecasting libraries in a unified API that
permits training, back-testing cross validation, and forecasting inference for groups of related
series.
This module exports groups of univariate ``diviner`` models in the following formats:

Diviner format
    Serialized instance of a ``diviner`` model type using native diviner serializers.
    (e.g., "GroupedProphet" or "GroupedPmdarima")
:py:mod:`mlflow.pyfunc`
    Produced for use by generic pyfunc-based deployment tools and for batch auditing
    of historical forecasts.

.. _Diviner:
    https://databricks-diviner.readthedocs.io/en/latest/index.html
"""

import logging
import os
import pathlib
import shutil
from typing import Any, Optional

import pandas as pd
import yaml

import mlflow
from mlflow import pyfunc
from mlflow.environment_variables import MLFLOW_DFS_TMP
from mlflow.exceptions import MlflowException
from mlflow.models import Model, ModelInputExample, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import _infer_signature_from_input_example
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.docstring_utils import LOG_MODEL_PARAM_DOCS, format_docstring
from mlflow.utils.environment import (
    _CONDA_ENV_FILE_NAME,
    _CONSTRAINTS_FILE_NAME,
    _PYTHON_ENV_FILE_NAME,
    _REQUIREMENTS_FILE_NAME,
    _mlflow_conda_env,
    _process_conda_env,
    _process_pip_requirements,
    _PythonEnv,
    _validate_env_arguments,
)
from mlflow.utils.file_utils import (
    get_total_file_size,
    shutil_copytree_without_file_permissions,
    write_to,
)
from mlflow.utils.model_utils import (
    _add_code_from_conf_to_system_path,
    _get_flavor_configuration,
    _get_flavor_configuration_from_uri,
    _validate_and_copy_code_paths,
    _validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from mlflow.utils.uri import dbfs_hdfs_uri_to_fuse_path, generate_tmp_dfs_path

FLAVOR_NAME = "diviner"
_MODEL_BINARY_KEY = "data"
_MODEL_BINARY_FILE_NAME = "model.div"
_MODEL_TYPE_KEY = "model_type"
_FLAVOR_KEY = "flavors"
_SPARK_MODEL_INDICATOR = "fit_with_spark"

_logger = logging.getLogger(__name__)


[docs]def get_default_pip_requirements(): """ Returns: A list of default pip requirements for MLflow Models produced with the ``Diviner`` flavor. Calls to :py:func:`save_model()` and :py:func:`log_model()` produce a pip environment that, at a minimum, contains these requirements. """ return [_get_pinned_requirement("diviner")]
[docs]def get_default_conda_env(): """ Returns: The default Conda environment for MLflow Models produced with the ``Diviner`` flavor that is produced by calls to :py:func:`save_model()` and :py:func:`log_model()`. """ return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def save_model( diviner_model, path, conda_env=None, code_paths=None, mlflow_model=None, signature: ModelSignature = None, input_example: ModelInputExample = None, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """Save a ``Diviner`` model object to a path on the local file system. Args: diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal ``DataFrame``. path: Local path destination for the serialized model is to be saved. conda_env: {{ conda_env }} code_paths: {{ code_paths }} mlflow_model: :py:mod:`mlflow.models.Model` the flavor that this model is being added to. signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python from mlflow.models import infer_signature model = diviner.GroupedProphet().fit(data, ("region", "state")) predictions = model.predict(prediction_config) signature = infer_signature(data, predictions) input_example: {{ input_example }} pip_requirements: {{ pip_requirements }} extra_pip_requirements: {{ extra_pip_requirements }} metadata: {{ metadata }} kwargs: Optional configurations for Spark DataFrame storage iff the model has been fit in Spark. Current supported options: - `partition_by` for setting a (or several) partition columns as a list of \ column names. Must be a list of strings of grouping key column(s). - `partition_count` for setting the number of part files to write from a \ repartition per `partition_by` group. The default part file count is 200. - `dfs_tmpdir` for specifying the DFS temporary location where the model will \ be stored while copying from a local file system to a Spark-supported "dbfs:/" \ scheme. """ import diviner _validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements) path = pathlib.Path(path).absolute() _validate_and_prepare_target_save_path(str(path)) # NB: When moving to native pathlib implementations, path encoding as string will not be needed. code_dir_subpath = _validate_and_copy_code_paths(code_paths, str(path)) if mlflow_model is None: mlflow_model = Model() saved_example = _save_example(mlflow_model, input_example, str(path)) if signature is None and saved_example is not None: wrapped_model = _DivinerModelWrapper(diviner_model) signature = _infer_signature_from_input_example(saved_example, wrapped_model) if signature is not None: mlflow_model.signature = signature if metadata is not None: mlflow_model.metadata = metadata fit_with_spark = _save_diviner_model(diviner_model, path, **kwargs) flavor_conf = {_SPARK_MODEL_INDICATOR: fit_with_spark} model_bin_kwargs = {_MODEL_BINARY_KEY: _MODEL_BINARY_FILE_NAME} pyfunc.add_to_model( mlflow_model, loader_module="mlflow.diviner", conda_env=_CONDA_ENV_FILE_NAME, python_env=_PYTHON_ENV_FILE_NAME, code=code_dir_subpath, **model_bin_kwargs, ) flavor_conf.update({_MODEL_TYPE_KEY: diviner_model.__class__.__name__}, **model_bin_kwargs) mlflow_model.add_flavor( FLAVOR_NAME, diviner_version=diviner.__version__, code=code_dir_subpath, **flavor_conf ) if size := get_total_file_size(path): mlflow_model.model_size_bytes = size mlflow_model.save(str(path.joinpath(MLMODEL_FILE_NAME))) if conda_env is None: if pip_requirements is None: default_reqs = get_default_pip_requirements() inferred_reqs = mlflow.models.infer_pip_requirements( str(path), FLAVOR_NAME, fallback=default_reqs ) default_reqs = sorted(set(inferred_reqs).union(default_reqs)) else: default_reqs = None conda_env, pip_requirements, pip_constraints = _process_pip_requirements( default_reqs, pip_requirements, extra_pip_requirements ) else: conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env) with path.joinpath(_CONDA_ENV_FILE_NAME).open("w") as f: yaml.safe_dump(conda_env, stream=f, default_flow_style=False) if pip_constraints: write_to(str(path.joinpath(_CONSTRAINTS_FILE_NAME)), "\n".join(pip_constraints)) write_to(str(path.joinpath(_REQUIREMENTS_FILE_NAME)), "\n".join(pip_requirements)) _PythonEnv.current().to_yaml(str(path.joinpath(_PYTHON_ENV_FILE_NAME)))
def _save_diviner_model(diviner_model, path, **kwargs) -> bool: """ Saves a Diviner model to the specified path. If the model was fit by using a Pandas DataFrame for the training data submitted to `fit`, directly save the Diviner model object. If the Diviner model was fit by using a Spark DataFrame, save the model components separately. The metadata and ancillary files to write (JSON and Pandas DataFrames) are written directly to a fuse mount location, which the Spark DataFrame that contains the individual serialized Diviner model objects is written by using the 'dbfs:' scheme path that Spark recognizes. """ save_path = str(path.joinpath(_MODEL_BINARY_FILE_NAME)) if getattr(diviner_model, "_fit_with_spark", False): # Validate that the path is a relative path early in order to fail fast prior to attempting # to write the (large) DataFrame to a tmp DFS path first and raise a path validation # Exception within MLflow when attempting to copy the temporary write files from DFS to # the file system path provided. if not os.path.isabs(path): raise MlflowException( "The save path provided must be a relative path. " f"The path submitted, '{path}' is an absolute path." ) # Create a temporary DFS location to write the Spark DataFrame containing the models to. tmp_path = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get())) # Save the model Spark DataFrame to the temporary DFS location diviner_model._save_model_df_to_path(tmp_path, **kwargs) diviner_data_path = os.path.abspath(save_path) tmp_fuse_path = dbfs_hdfs_uri_to_fuse_path(tmp_path) shutil.move(src=tmp_fuse_path, dst=diviner_data_path) # Save the model metadata to the path location diviner_model._save_model_metadata_components_to_path(path=diviner_data_path) return True diviner_model.save(save_path) return False def _load_model_fit_in_spark(local_model_path: str, flavor_conf, **kwargs): """ Loads a Diviner model that has been fit (and saved) in the Spark variant. """ # NB: To load the model DataFrame (which is a Spark DataFrame), Spark requires that the file # partitions are in DFS. In order to facilitate this, the model DataFrame (saved as parquet) # will be copied to a temporary DFS location. The remaining files can be read directly from # the local file system path, which is handled within the Diviner APIs. import diviner dfs_temp_directory = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get())) dfs_fuse_directory = dbfs_hdfs_uri_to_fuse_path(dfs_temp_directory) os.makedirs(dfs_fuse_directory) shutil_copytree_without_file_permissions(src_dir=local_model_path, dst_dir=dfs_fuse_directory) diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY]) load_directory = os.path.join(dfs_fuse_directory, flavor_conf[_MODEL_BINARY_KEY]) return diviner_instance.load(load_directory)
[docs]def load_model(model_uri, dst_path=None, **kwargs): """Load a ``Diviner`` object from a local file or a run. Args: model_uri: The location, in URI format, of the MLflow model. For example: - ``/Users/me/path/to/local/model`` - ``relative/path/to/local/model`` - ``s3://my_bucket/path/to/model`` - ``runs:/<mlflow_run_id>/run-relative/path/to/model`` - ``mlflow-artifacts:/path/to/model`` For more information about supported URI schemes, see `Referencing Artifacts <https://www.mlflow.org/docs/latest/tracking.html# artifact-locations>`_. dst_path: The local filesystem path to which to download the model artifact. This directory must already exist if provided. If unspecified, a local output path will be created. kwargs: Optional configuration options for loading of a Diviner model. For models that have been fit and saved using Spark, if a specific DFS temporary directory is desired for loading of Diviner models, use the keyword argument `"dfs_tmpdir"` to define the loading temporary path for the model during loading. Returns: A ``Diviner`` model instance. """ model_uri = str(model_uri) flavor_conf = _get_flavor_configuration_from_uri(model_uri, FLAVOR_NAME, _logger) local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) _add_code_from_conf_to_system_path(local_model_path, flavor_conf) if flavor_conf.get(_SPARK_MODEL_INDICATOR, False): return _load_model_fit_in_spark(local_model_path, flavor_conf, **kwargs) return _load_model(local_model_path, flavor_conf)
def _load_model(path, flavor_conf): """ Loads a Diviner model instance that was not fit using Spark from a file system location. """ import diviner local_path = pathlib.Path(path) diviner_model_path = local_path.joinpath( flavor_conf.get(_MODEL_BINARY_KEY, _MODEL_BINARY_FILE_NAME) ) diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY]) return diviner_instance.load(str(diviner_model_path)) def _load_pyfunc(path): local_path = pathlib.Path(path) # NB: reverting the dir walk that happens with pyfunc's loading implementation if local_path.is_file(): local_path = local_path.parent flavor_conf = _get_flavor_configuration(local_path, FLAVOR_NAME) if flavor_conf.get(_SPARK_MODEL_INDICATOR): raise MlflowException( "The model being loaded was fit in Spark. Diviner models fit in " "Spark do not support loading as pyfunc." ) return _DivinerModelWrapper(_load_model(local_path, flavor_conf))
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def log_model( diviner_model, artifact_path, conda_env=None, code_paths=None, registered_model_name=None, signature: ModelSignature = None, input_example: ModelInputExample = None, await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """Log a ``Diviner`` object as an MLflow artifact for the current run. Args: diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal ``DataFrame``. artifact_path: Run-relative artifact path to save the model instance to. conda_env: {{ conda_env }} code_paths: {{ code_paths }} registered_model_name: This argument may change or be removed in a future release without warning. If given, create a model version under ``registered_model_name``, also creating a registered model if one with the given name does not exist. signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python :caption: Example from mlflow.models import infer_signature auto_arima_obj = AutoARIMA(out_of_sample_size=60, maxiter=100) base_auto_arima = GroupedPmdarima(model_template=auto_arima_obj).fit( df=training_data, group_key_columns=("region", "state"), y_col="y", datetime_col="ds", silence_warnings=True, ) predictions = model.predict(n_periods=30, alpha=0.05, return_conf_int=True) signature = infer_signature(data, predictions) input_example: {{ input_example }} await_registration_for: Number of seconds to wait for the model version to finish being created and is in ``READY`` status. By default, the function waits for five minutes. Specify 0 or None to skip waiting. pip_requirements: {{ pip_requirements }} extra_pip_requirements: {{ extra_pip_requirements }} metadata: {{ metadata }} kwargs: Additional arguments for :py:class:`mlflow.models.model.Model` Additionally, for models that have been fit in Spark, the following supported configuration options are available to set. Current supported options: - `partition_by` for setting a (or several) partition columns as a list of \ column names. Must be a list of strings of grouping key column(s). - `partition_count` for setting the number of part files to write from a \ repartition per `partition_by` group. The default part file count is 200. - `dfs_tmpdir` for specifying the DFS temporary location where the model will \ be stored while copying from a local file system to a Spark-supported "dbfs:/" \ scheme. Returns: A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the metadata of the logged model. """ return Model.log( artifact_path=artifact_path, flavor=mlflow.diviner, registered_model_name=registered_model_name, diviner_model=diviner_model, conda_env=conda_env, code_paths=code_paths, signature=signature, input_example=input_example, await_registration_for=await_registration_for, pip_requirements=pip_requirements, extra_pip_requirements=extra_pip_requirements, metadata=metadata, **kwargs, )
class _DivinerModelWrapper: def __init__(self, diviner_model): self.diviner_model = diviner_model def get_raw_model(self): """ Returns the underlying model. """ return self.diviner_model def predict(self, dataframe, params: Optional[dict[str, Any]] = None) -> pd.DataFrame: """A method that allows a pyfunc implementation of this flavor to generate forecasted values from the end of a trained Diviner model's training series per group. The implementation here encapsulates a config-based switch of method calling. In short: * If the ``DataFrame`` supplied to this method contains a column ``groups`` whose first row of data is of type List[tuple[str]] (containing the series-identifying group keys that were generated to identify a single underlying model during training), the caller will resolve to the method ``predict_groups()`` in each of the underlying wrapped libraries (i.e., ``GroupedProphet.predict_groups()``). * If the ``DataFrame`` supplied does not contain the column name ``groups``, then the specific forecasting method that is primitive-driven (for ``GroupedProphet``, the ``predict()`` method mirrors that of ``Prophet``'s, requiring a ``DataFrame`` submitted with explicit datetime values per group which is not a tenable implementation for pyfunc or RESTful serving) is utilized. For ``GroupedProphet``, this is the ``.forecast()`` method, while for ``GroupedPmdarima``, this is the ``.predict()`` method. Args: dataframe: A ``pandas.DataFrame`` that contains the required configuration for the appropriate ``Diviner`` type. For example, for ``GroupedProphet.forecast()``: - horizon : int - frequency: str predict_conf = pd.DataFrame({"horizon": 30, "frequency": "D"}, index=[0]) forecast = pyfunc.load_pyfunc(model_uri=model_path).predict(predict_conf) Will generate 30 days of forecasted values for each group that the model was trained on. params: Additional parameters to pass to the model for inference. Returns: A Pandas DataFrame containing the forecasted values for each group key that was either trained or declared as a subset with a ``groups`` entry in the ``dataframe`` configuration argument. """ from diviner import GroupedPmdarima, GroupedProphet schema = dataframe.columns.values.tolist() conf = dataframe.to_dict(orient="index").get(0) # required parameter extraction and validation horizon = conf.get("horizon", None) n_periods = conf.get("n_periods", None) if n_periods and horizon and n_periods != horizon: raise MlflowException( "The provided prediction configuration contains both `n_periods` and `horizon` " "with different values. Please provide only one of these integer values.", error_code=INVALID_PARAMETER_VALUE, ) else: if not n_periods and horizon: n_periods = horizon if not n_periods: raise MlflowException( "The provided prediction configuration Pandas DataFrame does not contain either " "the `n_periods` or `horizon` columns. At least one of these must be specified " f"with a valid integer value. Configuration schema: {schema}.", error_code=INVALID_PARAMETER_VALUE, ) if not isinstance(n_periods, int): raise MlflowException( "The `n_periods` column contains invalid data. Supplied type must be an integer. " f"Type supplied: {type(n_periods)}", error_code=INVALID_PARAMETER_VALUE, ) frequency = conf.get("frequency", None) if isinstance(self.diviner_model, GroupedProphet) and not frequency: raise MlflowException( "Diviner's GroupedProphet model requires a `frequency` value to be submitted in " "Pandas date_range format. The submitted configuration Pandas DataFrame does not " f"contain a `frequency` column. Configuration schema: {schema}.", error_code=INVALID_PARAMETER_VALUE, ) predict_col = conf.get("predict_col", None) predict_groups = conf.get("groups", None) if predict_groups and not isinstance(predict_groups, list): raise MlflowException( "Specifying a group subset for prediction requires groups to be defined as a " f"[List[(Tuple|List)[<group_keys>]]. Submitted group type: {type(predict_groups)}.", error_code=INVALID_PARAMETER_VALUE, ) # NB: json serialization of a tuple converts the tuple to a List. Diviner requires a # List of Tuples to be input to the group_prediction API. This conversion is for utilizing # the pyfunc flavor through the serving API. if predict_groups and not isinstance(predict_groups[0], tuple): predict_groups = [tuple(group) for group in predict_groups] if isinstance(self.diviner_model, GroupedProphet): # We're wrapping two different endpoints to Diviner here for the pyfunc implementation. # Since we're limited by a single endpoint, we can address redirecting to the # method ``predict_groups()`` which will allow for a subset of groups to be forecasted # if the prediction configuration DataFrame contains a List[tuple[str]]] in the # ``groups`` column. If this column is not present, all groups will be used to generate # forecasts, utilizing the less computationally complex method ``forecast``. if not predict_groups: prediction_df = self.diviner_model.forecast(horizon=n_periods, frequency=frequency) else: group_kwargs = {k: v for k, v in conf.items() if k in {"predict_col", "on_error"}} prediction_df = self.diviner_model.predict_groups( groups=predict_groups, horizon=n_periods, frequency=frequency, **group_kwargs ) if predict_col is not None: prediction_df.rename(columns={"yhat": predict_col}, inplace=True) elif isinstance(self.diviner_model, GroupedPmdarima): # As above, we're redirecting the prediction request to one of two different methods # for ``Diviner``'s pmdarima implementation. If the ``groups`` column is present with # a list of tuples of keys to lookup, ``predict_groups()`` will be used. Otherwise, # the standard ``predict()`` method will be called to generate forecasts for all groups # that were trained on. restricted_keys = {"n_periods", "horizon", "frequency", "groups"} predict_conf = {k: v for k, v in conf.items() if k not in restricted_keys} if not predict_groups: prediction_df = self.diviner_model.predict(n_periods=n_periods, **predict_conf) else: prediction_df = self.diviner_model.predict_groups( groups=predict_groups, n_periods=n_periods, **predict_conf ) else: raise MlflowException( f"The Diviner model instance type '{type(self.diviner_model)}' is not supported " f"in version {mlflow.__version__} of MLflow.", error_code=INVALID_PARAMETER_VALUE, ) return prediction_df