"""
The ``mlflow.paddle`` module provides an API for logging and loading paddle models.
This module exports paddle models with the following flavors:
Paddle (native) format
This is the main flavor that can be loaded back into paddle.
:py:mod:`mlflow.pyfunc`
Produced for use by generic pyfunc-based deployment tools and batch inference.
NOTE: The `mlflow.pyfunc` flavor is only added for paddle models that define `predict()`,
since `predict()` is required for pyfunc model inference.
"""
import logging
import os
from typing import Any, Optional
import yaml
import mlflow
from mlflow import pyfunc
from mlflow.models import Model, ModelInputExample, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import _infer_signature_from_input_example
from mlflow.models.utils import _save_example
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.autologging_utils import autologging_integration, safe_patch
from mlflow.utils.docstring_utils import LOG_MODEL_PARAM_DOCS, format_docstring
from mlflow.utils.environment import (
_CONDA_ENV_FILE_NAME,
_CONSTRAINTS_FILE_NAME,
_PYTHON_ENV_FILE_NAME,
_REQUIREMENTS_FILE_NAME,
_mlflow_conda_env,
_process_conda_env,
_process_pip_requirements,
_PythonEnv,
_validate_env_arguments,
)
from mlflow.utils.file_utils import write_to
from mlflow.utils.model_utils import (
_add_code_from_conf_to_system_path,
_get_flavor_configuration,
_validate_and_copy_code_paths,
_validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
FLAVOR_NAME = "paddle"
_MODEL_DATA_SUBPATH = "model"
_logger = logging.getLogger(__name__)
[docs]def get_default_pip_requirements():
"""
Returns:
A list of default pip requirements for MLflow Models produced by this flavor.
Calls to :func:`save_model()` and :func:`log_model()` produce a pip environment
that, at minimum, contains these requirements.
"""
return [_get_pinned_requirement("paddlepaddle", module="paddle")]
[docs]def get_default_conda_env():
"""
Returns:
The default Conda environment for MLflow Models produced by calls to
:func:`save_model()` and :func:`log_model()`.
"""
return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def save_model(
pd_model,
path,
training=False,
conda_env=None,
code_paths=None,
mlflow_model=None,
signature: ModelSignature = None,
input_example: ModelInputExample = None,
pip_requirements=None,
extra_pip_requirements=None,
metadata=None,
):
"""
Save a paddle model to a path on the local file system. Produces an MLflow Model
containing the following flavors:
- :py:mod:`mlflow.paddle`
- :py:mod:`mlflow.pyfunc`. NOTE: This flavor is only included for paddle models
that define `predict()`, since `predict()` is required for pyfunc model inference.
Args:
pd_model: paddle model to be saved.
path: Local path where the model is to be saved.
training: Only valid when saving a model trained using the PaddlePaddle high level API.
If set to True, the saved model supports both re-training and
inference. If set to False, it only supports inference.
conda_env: {{ conda_env }}
code_paths: {{ code_paths }}
mlflow_model: :py:mod:`mlflow.models.Model` this flavor is being added to.
signature: {{ signature }}
input_example: {{ input_example }}
pip_requirements: {{ pip_requirements }}
extra_pip_requirements: {{ extra_pip_requirements }}
metadata: {{ metadata }}
.. code-block:: python
:caption: Example
import mlflow.paddle
import paddle
from paddle.nn import Linear
import paddle.nn.functional as F
import numpy as np
import os
import random
from sklearn.datasets import load_diabetes
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
def load_data():
# dataset on boston housing prediction
X, y = load_diabetes(return_X_y=True, as_frame=True)
min_max_scaler = preprocessing.MinMaxScaler()
X_min_max = min_max_scaler.fit_transform(X)
X_normalized = preprocessing.scale(X_min_max, with_std=False)
X_train, X_test, y_train, y_test = train_test_split(
X_normalized, y, test_size=0.2, random_state=42
)
y_train = y_train.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)
return np.concatenate((X_train, y_train), axis=1), np.concatenate(
(X_test, y_test), axis=1
)
class Regressor(paddle.nn.Layer):
def __init__(self):
super().__init__()
self.fc = Linear(in_features=13, out_features=1)
@paddle.jit.to_static
def forward(self, inputs):
x = self.fc(inputs)
return x
model = Regressor()
model.train()
training_data, test_data = load_data()
opt = paddle.optimizer.SGD(learning_rate=0.01, parameters=model.parameters())
EPOCH_NUM = 10
BATCH_SIZE = 10
for epoch_id in range(EPOCH_NUM):
np.random.shuffle(training_data)
mini_batches = [
training_data[k : k + BATCH_SIZE]
for k in range(0, len(training_data), BATCH_SIZE)
]
for iter_id, mini_batch in enumerate(mini_batches):
x = np.array(mini_batch[:, :-1]).astype("float32")
y = np.array(mini_batch[:, -1:]).astype("float32")
house_features = paddle.to_tensor(x)
prices = paddle.to_tensor(y)
predicts = model(house_features)
loss = F.square_error_cost(predicts, label=prices)
avg_loss = paddle.mean(loss)
if iter_id % 20 == 0:
print(f"epoch: {epoch_id}, iter: {iter_id}, loss is: {avg_loss.numpy()}")
avg_loss.backward()
opt.step()
opt.clear_grad()
mlflow.log_param("learning_rate", 0.01)
mlflow.paddle.log_model(model, "model")
sk_path_dir = "./test-out"
mlflow.paddle.save_model(model, sk_path_dir)
print("Model saved in run %s" % mlflow.active_run().info.run_uuid)
"""
import paddle
_validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements)
_validate_and_prepare_target_save_path(path)
code_dir_subpath = _validate_and_copy_code_paths(code_paths, path)
if mlflow_model is None:
mlflow_model = Model()
saved_example = _save_example(mlflow_model, input_example, path)
if signature is None and saved_example is not None:
wrapped_model = _PaddleWrapper(pd_model)
signature = _infer_signature_from_input_example(saved_example, wrapped_model)
elif signature is False:
signature = None
if signature is not None:
mlflow_model.signature = signature
if metadata is not None:
mlflow_model.metadata = metadata
model_data_subpath = _MODEL_DATA_SUBPATH
output_path = os.path.join(path, model_data_subpath)
if isinstance(pd_model, paddle.Model):
pd_model.save(output_path, training=training)
else:
paddle.jit.save(pd_model, output_path)
# `PyFuncModel` only works for paddle models that define `predict()`.
pyfunc.add_to_model(
mlflow_model,
loader_module="mlflow.paddle",
model_path=model_data_subpath,
conda_env=_CONDA_ENV_FILE_NAME,
python_env=_PYTHON_ENV_FILE_NAME,
code=code_dir_subpath,
)
mlflow_model.add_flavor(
FLAVOR_NAME,
pickled_model=model_data_subpath,
paddle_version=paddle.__version__,
code=code_dir_subpath,
)
mlflow_model.save(os.path.join(path, MLMODEL_FILE_NAME))
if conda_env is None:
if pip_requirements is None:
default_reqs = get_default_pip_requirements()
# To ensure `_load_pyfunc` can successfully load the model during the dependency
# inference, `mlflow_model.save` must be called beforehand to save an MLmodel file.
inferred_reqs = mlflow.models.infer_pip_requirements(
path,
FLAVOR_NAME,
fallback=default_reqs,
)
default_reqs = sorted(set(inferred_reqs).union(default_reqs))
else:
default_reqs = None
conda_env, pip_requirements, pip_constraints = _process_pip_requirements(
default_reqs,
pip_requirements,
extra_pip_requirements,
)
else:
conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env)
with open(os.path.join(path, _CONDA_ENV_FILE_NAME), "w") as f:
yaml.safe_dump(conda_env, stream=f, default_flow_style=False)
# Save `constraints.txt` if necessary
if pip_constraints:
write_to(os.path.join(path, _CONSTRAINTS_FILE_NAME), "\n".join(pip_constraints))
# Save `requirements.txt`
write_to(os.path.join(path, _REQUIREMENTS_FILE_NAME), "\n".join(pip_requirements))
_PythonEnv.current().to_yaml(os.path.join(path, _PYTHON_ENV_FILE_NAME))
[docs]def load_model(model_uri, model=None, dst_path=None, **kwargs):
"""
Load a paddle model from a local file or a run.
Args:
model_uri: The location, in URI format, of the MLflow model, for example:
- ``/Users/me/path/to/local/model``
- ``relative/path/to/local/model``
- ``s3://my_bucket/path/to/model``
- ``runs:/<mlflow_run_id>/run-relative/path/to/model``
- ``models:/<model_name>/<model_version>``
- ``models:/<model_name>/<stage>``
model: Required when loading a `paddle.Model` model saved with `training=True`.
dst_path: The local filesystem path to which to download the model artifact.
This directory must already exist. If unspecified, a local output
path will be created.
kwargs: The keyword arguments to pass to `paddle.jit.load`
or `model.load`.
For more information about supported URI schemes, see
`Referencing Artifacts <https://www.mlflow.org/docs/latest/concepts.html#
artifact-locations>`_.
Returns:
A paddle model.
.. code-block:: python
:caption: Example
import mlflow.paddle
pd_model = mlflow.paddle.load_model("runs:/96771d893a5e46159d9f3b49bf9013e2/pd_models")
# use Pandas DataFrame to make predictions
np_array = ...
predictions = pd_model(np_array)
"""
import paddle
local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path)
flavor_conf = _get_flavor_configuration(model_path=local_model_path, flavor_name=FLAVOR_NAME)
_add_code_from_conf_to_system_path(local_model_path, flavor_conf)
pd_model_artifacts_path = os.path.join(local_model_path, flavor_conf["pickled_model"])
if model is None:
return paddle.jit.load(pd_model_artifacts_path, **kwargs)
elif not isinstance(model, paddle.Model):
raise TypeError(f"Invalid object type `{type(model)}` for `model`, must be `paddle.Model`")
else:
contains_pdparams = _contains_pdparams(local_model_path)
if not contains_pdparams:
raise TypeError(
"This model can't be loaded via `model.load` because a '.pdparams' file "
"doesn't exist. Please leave `model` unspecified to load the model via "
"`paddle.jit.load` or set `training` to True when saving a model."
)
model.load(pd_model_artifacts_path, **kwargs)
return model
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME))
def log_model(
pd_model,
artifact_path,
training=False,
conda_env=None,
code_paths=None,
registered_model_name=None,
signature: ModelSignature = None,
input_example: ModelInputExample = None,
await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS,
pip_requirements=None,
extra_pip_requirements=None,
metadata=None,
):
"""
Log a paddle model as an MLflow artifact for the current run. Produces an MLflow Model
containing the following flavors:
- :py:mod:`mlflow.paddle`
- :py:mod:`mlflow.pyfunc`. NOTE: This flavor is only included for paddle models
that define `predict()`, since `predict()` is required for pyfunc model inference.
Args:
pd_model: paddle model to be saved.
artifact_path: Run-relative artifact path.
training: Only valid when saving a model trained using the PaddlePaddle high level API.
If set to True, the saved model supports both re-training and
inference. If set to False, it only supports inference.
conda_env: {{ conda_env }}
code_paths: {{ code_paths }}
registered_model_name: If given, create a model version under
``registered_model_name``, also creating a registered model if one
with the given name does not exist.
signature: {{ signature }}
input_example: {{ input_example }}
await_registration_for: Number of seconds to wait for the model version to finish
being created and is in ``READY`` status. By default, the function
waits for five minutes. Specify 0 or None to skip waiting.
pip_requirements: {{ pip_requirements }}
extra_pip_requirements: {{ extra_pip_requirements }}
metadata: {{ metadata }}
Returns:
A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the
metadata of the logged model.
.. code-block:: python
:caption: Example
import mlflow.paddle
def load_data(): ...
class Regressor: ...
model = Regressor()
model.train()
training_data, test_data = load_data()
opt = paddle.optimizer.SGD(learning_rate=0.01, parameters=model.parameters())
EPOCH_NUM = 10
BATCH_SIZE = 10
for epoch_id in range(EPOCH_NUM):
...
mlflow.log_param("learning_rate", 0.01)
mlflow.paddle.log_model(model, "model")
sk_path_dir = ...
mlflow.paddle.save_model(model, sk_path_dir)
"""
return Model.log(
artifact_path=artifact_path,
flavor=mlflow.paddle,
pd_model=pd_model,
conda_env=conda_env,
code_paths=code_paths,
registered_model_name=registered_model_name,
signature=signature,
input_example=input_example,
await_registration_for=await_registration_for,
training=training,
pip_requirements=pip_requirements,
extra_pip_requirements=extra_pip_requirements,
metadata=metadata,
)
def _load_pyfunc(path):
"""
Loads PyFunc implementation. Called by ``pyfunc.load_model``.
Args:
path: Local filesystem path to the MLflow Model with the ``paddle`` flavor.
"""
return _PaddleWrapper(load_model(path))
class _PaddleWrapper:
"""
Wrapper class that creates a predict function such that
predict(data: pd.DataFrame) -> model's output as pd.DataFrame (pandas DataFrame)
"""
def __init__(self, pd_model):
self.pd_model = pd_model
def get_raw_model(self):
"""
Returns the underlying model.
"""
return self.pd_model
def predict(
self,
data,
params: Optional[dict[str, Any]] = None,
):
"""
Args:
data: Model input data.
params: Additional parameters to pass to the model for inference.
Returns:
Model predictions.
"""
import numpy as np
import paddle
import pandas as pd
if isinstance(data, pd.DataFrame):
inp_data = data.values.astype(np.float32)
elif isinstance(data, np.ndarray):
inp_data = data
elif isinstance(data, (list, dict)):
raise TypeError(
"The paddle flavor does not support List or Dict input types. "
"Please use a pandas.DataFrame or a numpy.ndarray"
)
else:
raise TypeError("Input data should be pandas.DataFrame or numpy.ndarray")
inp_data = np.squeeze(inp_data)
self.pd_model.eval()
predicted = self.pd_model(paddle.to_tensor(inp_data))
return pd.DataFrame(predicted.numpy())
def _contains_pdparams(path):
file_list = os.listdir(path)
return any(".pdparams" in file for file in file_list)
[docs]@autologging_integration(FLAVOR_NAME)
def autolog(
log_every_n_epoch=1,
log_models=True,
disable=False,
exclusive=False,
silent=False,
registered_model_name=None,
extra_tags=None,
):
"""
Enables (or disables) and configures autologging from PaddlePaddle to MLflow.
Autologging is performed when the `fit` method of `paddle.Model`_ is called.
.. _paddle.Model:
https://www.paddlepaddle.org.cn/documentation/docs/en/api/paddle/Model_en.html
Args:
log_every_n_epoch: If specified, logs metrics once every `n` epochs. By default, metrics
are logged after every epoch.
log_models: If ``True``, trained models are logged as MLflow model artifacts.
If ``False``, trained models are not logged.
disable: If ``True``, disables the PaddlePaddle autologging integration.
If ``False``, enables the PaddlePaddle autologging integration.
exclusive: If ``True``, autologged content is not logged to user-created fluent runs.
If ``False``, autologged content is logged to the active fluent run,
which may be user-created.
silent: If ``True``, suppress all event logs and warnings from MLflow during PyTorch
Lightning autologging. If ``False``, show all events and warnings during
PaddlePaddle autologging.
registered_model_name: If given, each time a model is trained, it is registered as a
new model version of the registered model with this name.
The registered model is created if it does not already exist.
extra_tags: A dictionary of extra tags to set on each managed run created by autologging.
.. code-block:: python
:caption: Example
import paddle
import mlflow
from mlflow import MlflowClient
def show_run_data(run_id):
run = mlflow.get_run(run_id)
print(f"params: {run.data.params}")
print(f"metrics: {run.data.metrics}")
client = MlflowClient()
artifacts = [f.path for f in client.list_artifacts(run.info.run_id, "model")]
print(f"artifacts: {artifacts}")
class LinearRegression(paddle.nn.Layer):
def __init__(self):
super().__init__()
self.fc = paddle.nn.Linear(13, 1)
def forward(self, feature):
return self.fc(feature)
train_dataset = paddle.text.datasets.UCIHousing(mode="train")
eval_dataset = paddle.text.datasets.UCIHousing(mode="test")
model = paddle.Model(LinearRegression())
optim = paddle.optimizer.SGD(learning_rate=1e-2, parameters=model.parameters())
model.prepare(optim, paddle.nn.MSELoss(), paddle.metric.Accuracy())
mlflow.paddle.autolog()
with mlflow.start_run() as run:
model.fit(train_dataset, eval_dataset, batch_size=16, epochs=10)
show_run_data(run.info.run_id)
.. code-block:: text
:caption: Output
params: {
"learning_rate": "0.01",
"optimizer_name": "SGD",
}
metrics: {
"loss": 17.482044,
"step": 25.0,
"acc": 0.0,
"eval_step": 6.0,
"eval_acc": 0.0,
"eval_batch_size": 6.0,
"batch_size": 4.0,
"eval_loss": 24.717455,
}
artifacts: [
"model/MLmodel",
"model/conda.yaml",
"model/model.pdiparams",
"model/model.pdiparams.info",
"model/model.pdmodel",
"model/requirements.txt",
]
"""
import paddle
from mlflow.paddle._paddle_autolog import patched_fit
safe_patch(
FLAVOR_NAME, paddle.Model, "fit", patched_fit, manage_run=True, extra_tags=extra_tags
)