Source code for mlflow.projects

"""
The ``mlflow.projects`` module provides an API for running MLflow projects locally or remotely.
"""

import json
import logging
import os

import yaml

import mlflow.projects.databricks
import mlflow.utils.uri
from mlflow import tracking
from mlflow.entities import RunStatus
from mlflow.exceptions import ExecutionException, MlflowException
from mlflow.projects.backend import loader
from mlflow.projects.submitted_run import SubmittedRun
from mlflow.projects.utils import (
    MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG,
    PROJECT_BUILD_IMAGE,
    PROJECT_DOCKER_ARGS,
    PROJECT_DOCKER_AUTH,
    PROJECT_ENV_MANAGER,
    PROJECT_STORAGE_DIR,
    PROJECT_SYNCHRONOUS,
    fetch_and_validate_project,
    get_entry_point_command,
    get_or_create_run,
    get_run_env_vars,
    load_project,
)
from mlflow.tracking.fluent import _get_experiment_id
from mlflow.utils import env_manager as _EnvManager
from mlflow.utils.mlflow_tags import (
    MLFLOW_DOCKER_IMAGE_ID,
    MLFLOW_PROJECT_BACKEND,
    MLFLOW_PROJECT_ENV,
    MLFLOW_RUN_NAME,
)

_logger = logging.getLogger(__name__)


def _resolve_experiment_id(experiment_name=None, experiment_id=None):
    """
    Resolve experiment.

    Verifies either one or other is specified - cannot be both selected.

    If ``experiment_name`` is provided and does not exist, an experiment
    of that name is created and its id is returned.

    Args:
        experiment_name: Name of experiment under which to launch the run.
        experiment_id: ID of experiment under which to launch the run.

    Returns:
        str
    """

    if experiment_name and experiment_id:
        raise MlflowException("Specify only one of 'experiment_name' or 'experiment_id'.")

    if experiment_id:
        return str(experiment_id)

    if experiment_name:
        client = tracking.MlflowClient()
        exp = client.get_experiment_by_name(experiment_name)
        if exp:
            return exp.experiment_id
        else:
            _logger.info("'%s' does not exist. Creating a new experiment", experiment_name)
            return client.create_experiment(experiment_name)

    return _get_experiment_id()


def _run(
    uri,
    experiment_id,
    entry_point,
    version,
    parameters,
    docker_args,
    backend_name,
    backend_config,
    storage_dir,
    env_manager,
    synchronous,
    run_name,
    build_image,
    docker_auth,
):
    """
    Helper that delegates to the project-running method corresponding to the passed-in backend.
    Returns a ``SubmittedRun`` corresponding to the project run.
    """
    tracking_store_uri = tracking.get_tracking_uri()
    backend_config[PROJECT_ENV_MANAGER] = env_manager
    backend_config[PROJECT_SYNCHRONOUS] = synchronous
    backend_config[PROJECT_DOCKER_ARGS] = docker_args
    backend_config[PROJECT_STORAGE_DIR] = storage_dir
    backend_config[PROJECT_BUILD_IMAGE] = build_image
    backend_config[PROJECT_DOCKER_AUTH] = docker_auth
    # TODO: remove this check once kubernetes execution has been refactored
    if backend_name not in {"databricks", "kubernetes"}:
        backend = loader.load_backend(backend_name)
        if backend:
            submitted_run = backend.run(
                uri,
                entry_point,
                parameters,
                version,
                backend_config,
                tracking_store_uri,
                experiment_id,
            )
            tracking.MlflowClient().set_tag(
                submitted_run.run_id, MLFLOW_PROJECT_BACKEND, backend_name
            )
            if run_name is not None:
                tracking.MlflowClient().set_tag(submitted_run.run_id, MLFLOW_RUN_NAME, run_name)
            return submitted_run

    work_dir = fetch_and_validate_project(uri, version, entry_point, parameters)
    project = load_project(work_dir)
    _validate_execution_environment(project, backend_name)

    active_run = get_or_create_run(
        None, uri, experiment_id, work_dir, version, entry_point, parameters
    )

    if run_name is not None:
        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_RUN_NAME, run_name)

    if backend_name == "databricks":
        tracking.MlflowClient().set_tag(
            active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "databricks"
        )
        from mlflow.projects.databricks import run_databricks, run_databricks_spark_job

        if project.databricks_spark_job_spec is not None:
            return run_databricks_spark_job(
                remote_run=active_run,
                uri=uri,
                work_dir=work_dir,
                experiment_id=experiment_id,
                cluster_spec=backend_config,
                project_spec=project,
                entry_point=entry_point,
                parameters=parameters,
            )

        return run_databricks(
            remote_run=active_run,
            uri=uri,
            entry_point=entry_point,
            work_dir=work_dir,
            parameters=parameters,
            experiment_id=experiment_id,
            cluster_spec=backend_config,
            env_manager=env_manager,
        )

    elif backend_name == "kubernetes":
        from mlflow.projects import kubernetes as kb
        from mlflow.projects.docker import (
            build_docker_image,
            validate_docker_env,
            validate_docker_installation,
        )

        tracking.MlflowClient().set_tag(active_run.info.run_id, MLFLOW_PROJECT_ENV, "docker")
        tracking.MlflowClient().set_tag(
            active_run.info.run_id, MLFLOW_PROJECT_BACKEND, "kubernetes"
        )
        validate_docker_env(project)
        validate_docker_installation()
        kube_config = _parse_kubernetes_config(backend_config)
        image = build_docker_image(
            work_dir=work_dir,
            repository_uri=kube_config["repository-uri"],
            base_image=project.docker_env.get("image"),
            run_id=active_run.info.run_id,
            build_image=build_image,
            docker_auth=docker_auth,
        )
        image_digest = kb.push_image_to_registry(image.tags[0])
        tracking.MlflowClient().set_tag(
            active_run.info.run_id, MLFLOW_DOCKER_IMAGE_ID, image_digest
        )
        return kb.run_kubernetes_job(
            project.name,
            active_run,
            image.tags[0],
            image_digest,
            get_entry_point_command(project, entry_point, parameters, storage_dir),
            get_run_env_vars(
                run_id=active_run.info.run_uuid, experiment_id=active_run.info.experiment_id
            ),
            kube_config.get("kube-context", None),
            kube_config["kube-job-template"],
        )

    supported_backends = ["databricks", "kubernetes"] + list(loader.MLFLOW_BACKENDS.keys())
    raise ExecutionException(
        f"Got unsupported execution mode {backend_name}. Supported values: {supported_backends}"
    )


[docs]def run( uri, entry_point="main", version=None, parameters=None, docker_args=None, experiment_name=None, experiment_id=None, backend="local", backend_config=None, storage_dir=None, synchronous=True, run_id=None, run_name=None, env_manager=None, build_image=False, docker_auth=None, ): """ Run an MLflow project. The project can be local or stored at a Git URI. MLflow provides built-in support for running projects locally or remotely on a Databricks or Kubernetes cluster. You can also run projects against other targets by installing an appropriate third-party plugin. See `Community Plugins <../plugins.html#community-plugins>`_ for more information. For information on using this method in chained workflows, see `Building Multistep Workflows <../projects.html#building-multistep-workflows>`_. Raises: :py:class:`mlflow.exceptions.ExecutionException` If a run launched in blocking mode is unsuccessful. Args: uri: URI of project to run. A local filesystem path or a Git repository URI (e.g. https://github.com/mlflow/mlflow-example) pointing to a project directory containing an MLproject file. entry_point: Entry point to run within the project. If no entry point with the specified name is found, runs the project file ``entry_point`` as a script, using "python" to run ``.py`` files and the default shell (specified by environment variable ``$SHELL``) to run ``.sh`` files. version: For Git-based projects, either a commit hash or a branch name. parameters: Parameters (dictionary) for the entry point command. docker_args: Arguments (dictionary) for the docker command. experiment_name: Name of experiment under which to launch the run. experiment_id: ID of experiment under which to launch the run. backend: Execution backend for the run: MLflow provides built-in support for "local", "databricks", and "kubernetes" (experimental) backends. If running against Databricks, will run against a Databricks workspace determined as follows: if a Databricks tracking URI of the form ``databricks://profile`` has been set (e.g. by setting the MLFLOW_TRACKING_URI environment variable), will run against the workspace specified by <profile>. Otherwise, runs against the workspace specified by the default Databricks CLI profile. backend_config: A dictionary, or a path to a JSON file (must end in '.json'), which will be passed as config to the backend. The exact content which should be provided is different for each execution backend and is documented at https://www.mlflow.org/docs/latest/projects.html. storage_dir: Used only if ``backend`` is "local". MLflow downloads artifacts from distributed URIs passed to parameters of type ``path`` to subdirectories of ``storage_dir``. synchronous: Whether to block while waiting for a run to complete. Defaults to True. Note that if ``synchronous`` is False and ``backend`` is "local", this method will return, but the current process will block when exiting until the local run completes. If the current process is interrupted, any asynchronous runs launched via this method will be terminated. If ``synchronous`` is True and the run fails, the current process will error out as well. run_id: Note: this argument is used internally by the MLflow project APIs and should not be specified. If specified, the run ID will be used instead of creating a new run. run_name: The name to give the MLflow Run associated with the project execution. If ``None``, the MLflow Run name is left unset. env_manager: Specify an environment manager to create a new environment for the run and install project dependencies within that environment. The following values are supported: - local: use the local environment - virtualenv: use virtualenv (and pyenv for Python version management) - conda: use conda If unspecified, MLflow automatically determines the environment manager to use by inspecting files in the project directory. For example, if ``python_env.yaml`` is present, virtualenv will be used. build_image: Whether to build a new docker image of the project or to reuse an existing image. Default: False (reuse an existing image) docker_auth: A dictionary representing information to authenticate with a Docker registry. See `docker.client.DockerClient.login <https://docker-py.readthedocs.io/en/stable/client.html#docker.client.DockerClient.login>`_ for available options. Returns: :py:class:`mlflow.projects.SubmittedRun` exposing information (e.g. run ID) about the launched run. .. code-block:: python :caption: Example import mlflow project_uri = "https://github.com/mlflow/mlflow-example" params = {"alpha": 0.5, "l1_ratio": 0.01} # Run MLflow project and create a reproducible conda environment # on a local host mlflow.run(project_uri, parameters=params) .. code-block:: text :caption: Output ... ... Elasticnet model (alpha=0.500000, l1_ratio=0.010000): RMSE: 0.788347345611717 MAE: 0.6155576449938276 R2: 0.19729662005412607 ... mlflow.projects: === Run (ID '6a5109febe5e4a549461e149590d0a7c') succeeded === """ backend_config_dict = backend_config if backend_config is not None else {} if ( backend_config and type(backend_config) != dict and os.path.splitext(backend_config)[-1] == ".json" ): with open(backend_config) as handle: try: backend_config_dict = json.load(handle) except ValueError: _logger.error( "Error when attempting to load and parse JSON cluster spec from file %s", backend_config, ) raise if env_manager is not None: _EnvManager.validate(env_manager) if backend == "databricks": mlflow.projects.databricks.before_run_validations(mlflow.get_tracking_uri(), backend_config) elif backend == "local" and run_id is not None: backend_config_dict[MLFLOW_LOCAL_BACKEND_RUN_ID_CONFIG] = run_id experiment_id = _resolve_experiment_id( experiment_name=experiment_name, experiment_id=experiment_id ) submitted_run_obj = _run( uri=uri, experiment_id=experiment_id, entry_point=entry_point, version=version, parameters=parameters, docker_args=docker_args, backend_name=backend, backend_config=backend_config_dict, env_manager=env_manager, storage_dir=storage_dir, synchronous=synchronous, run_name=run_name, build_image=build_image, docker_auth=docker_auth, ) if synchronous: _wait_for(submitted_run_obj) return submitted_run_obj
def _wait_for(submitted_run_obj): """Wait on the passed-in submitted run, reporting its status to the tracking server.""" run_id = submitted_run_obj.run_id active_run = None # Note: there's a small chance we fail to report the run's status to the tracking server if # we're interrupted before we reach the try block below try: active_run = tracking.MlflowClient().get_run(run_id) if run_id is not None else None if submitted_run_obj.wait(): _logger.info("=== Run (ID '%s') succeeded ===", run_id) _maybe_set_run_terminated(active_run, "FINISHED") else: _maybe_set_run_terminated(active_run, "FAILED") raise ExecutionException(f"Run (ID '{run_id}') failed") except KeyboardInterrupt: _logger.error("=== Run (ID '%s') interrupted, cancelling run ===", run_id) submitted_run_obj.cancel() _maybe_set_run_terminated(active_run, "FAILED") raise def _maybe_set_run_terminated(active_run, status): """ If the passed-in active run is defined and still running (i.e. hasn't already been terminated within user code), mark it as terminated with the passed-in status. """ if active_run is None: return run_id = active_run.info.run_id cur_status = tracking.MlflowClient().get_run(run_id).info.status if RunStatus.is_terminated(cur_status): return tracking.MlflowClient().set_terminated(run_id, status) def _validate_execution_environment(project, backend): if project.docker_env and backend == "databricks": raise ExecutionException( "Running docker-based projects on Databricks is not yet supported." ) def _parse_kubernetes_config(backend_config): """ Creates build context tarfile containing Dockerfile and project code, returning path to tarfile """ if not backend_config: raise ExecutionException("Backend_config file not found.") kube_config = backend_config.copy() if "kube-job-template-path" not in backend_config.keys(): raise ExecutionException( "'kube-job-template-path' attribute must be specified in backend_config." ) kube_job_template = backend_config["kube-job-template-path"] if os.path.exists(kube_job_template): with open(kube_job_template) as job_template: yaml_obj = yaml.safe_load(job_template.read()) kube_job_template = yaml_obj kube_config["kube-job-template"] = kube_job_template else: raise ExecutionException(f"Could not find 'kube-job-template-path': {kube_job_template}") if "kube-context" not in backend_config.keys(): _logger.debug( "Could not find kube-context in backend_config." " Using current context or in-cluster config." ) if "repository-uri" not in backend_config.keys(): raise ExecutionException("Could not find 'repository-uri' in backend_config.") return kube_config __all__ = ["run", "SubmittedRun"]