import logging
import os
import signal
from abc import abstractmethod
from mlflow.entities import RunStatus
from mlflow.utils.annotations import developer_stable
_logger = logging.getLogger(__name__)
[docs]@developer_stable
class SubmittedRun:
"""
Wrapper around an MLflow project run (e.g. a subprocess running an entry point
command or a Databricks job run) and exposing methods for waiting on and cancelling the run.
This class defines the interface that the MLflow project runner uses to manage the lifecycle
of runs launched in different environments (e.g. runs launched locally or on Databricks).
``SubmittedRun`` is not thread-safe. That is, concurrent calls to wait() / cancel()
from multiple threads may inadvertently kill resources (e.g. local processes) unrelated to the
run.
NOTE:
Subclasses of ``SubmittedRun`` must expose a ``run_id`` member containing the
run's MLflow run ID.
"""
[docs] @abstractmethod
def wait(self):
"""
Wait for the run to finish, returning True if the run succeeded and false otherwise. Note
that in some cases (e.g. remote execution on Databricks), we may wait until the remote job
completes rather than until the MLflow run completes.
"""
[docs] @abstractmethod
def get_status(self):
"""
Get status of the run.
"""
[docs] @abstractmethod
def cancel(self):
"""
Cancel the run (interrupts the command subprocess, cancels the Databricks run, etc) and
waits for it to terminate. The MLflow run status may not be set correctly
upon run cancellation.
"""
@property
@abstractmethod
def run_id(self):
pass
class LocalSubmittedRun(SubmittedRun):
"""
Instance of ``SubmittedRun`` corresponding to a subprocess launched to run an entry point
command locally.
"""
def __init__(self, run_id, command_proc):
super().__init__()
self._run_id = run_id
self.command_proc = command_proc
@property
def run_id(self):
return self._run_id
def wait(self):
return self.command_proc.wait() == 0
def cancel(self):
# Interrupt child process if it hasn't already exited
if self.command_proc.poll() is None:
# Kill the the process tree rooted at the child if it's the leader of its own process
# group, otherwise just kill the child
try:
if self.command_proc.pid == os.getpgid(self.command_proc.pid):
os.killpg(self.command_proc.pid, signal.SIGTERM)
else:
self.command_proc.terminate()
except OSError:
# The child process may have exited before we attempted to terminate it, so we
# ignore OSErrors raised during child process termination
_logger.info(
"Failed to terminate child process (PID %s) corresponding to MLflow "
"run with ID %s. The process may have already exited.",
self.command_proc.pid,
self._run_id,
)
self.command_proc.wait()
def _get_status(self):
exit_code = self.command_proc.poll()
if exit_code is None:
return RunStatus.RUNNING
if exit_code == 0:
return RunStatus.FINISHED
return RunStatus.FAILED
def get_status(self):
return RunStatus.to_string(self._get_status())