Source code for mlflow.models.resources

import os
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Optional

import yaml

DEFAULT_API_VERSION = "1"


[docs]class ResourceType(Enum): """ Enum to define the different types of resources needed to serve a model. """ UC_CONNECTION = "uc_connection" VECTOR_SEARCH_INDEX = "vector_search_index" SERVING_ENDPOINT = "serving_endpoint" SQL_WAREHOUSE = "sql_warehouse" FUNCTION = "function" GENIE_SPACE = "genie_space" TABLE = "table"
[docs]class Resource(ABC): """ Base class for defining the resources needed to serve a model. Args: type (ResourceType): The resource type. target_uri (str): The target URI where these resources are hosted. """ @property @abstractmethod def type(self) -> ResourceType: """ The resource type (must be defined by subclasses). """ @property @abstractmethod def target_uri(self) -> str: """ The target URI where the resource is hosted (must be defined by subclasses). """
[docs] @abstractmethod def to_dict(self): """ Convert the resource to a dictionary. Subclasses must implement this method. """
[docs] @classmethod @abstractmethod def from_dict(cls, data: dict[str, str]): """ Convert the dictionary to a Resource. Subclasses must implement this method. """
def __eq__(self, other: Any): if not isinstance(other, Resource): return False return self.to_dict() == other.to_dict()
class DatabricksResource(Resource, ABC): """ Base class to define all the Databricks resources to serve a model. Example usage: https://docs.databricks.com/en/generative-ai/log-agent.html#specify-resources-for-pyfunc-or-langchain-agent """ @property def target_uri(self) -> str: return "databricks" @property def type(self) -> ResourceType: raise NotImplementedError("Subclasses must implement the 'type' property.") def __init__(self, name: str, on_behalf_of_user: Optional[bool] = None): self.name = name self.on_behalf_of_user = on_behalf_of_user def to_dict(self): result = {self.type.value: [{"name": self.name}]} if self.on_behalf_of_user is not None: result[self.type.value][0]["on_behalf_of_user"] = self.on_behalf_of_user return result @classmethod def from_dict(cls, data: dict[str, str]): return cls(data["name"], data.get("on_behalf_of_user")) class DatabricksUCConnection(DatabricksResource): """ Define a Databricks UC Connection used to serve a model. Args: connection_name (str): The name of the databricks UC connection used to create the tool which was used to build the model. on_behalf_of_user (Optional[bool]): If True, the resource is accessed with with the permission of the invoker of the model in the serving endpoint. If set to None or False, the resources is accesssed with the permissions of the creator """ @property def type(self) -> ResourceType: return ResourceType.UC_CONNECTION def __init__(self, connection_name: str, on_behalf_of_user: Optional[bool] = None): super().__init__(connection_name, on_behalf_of_user) class DatabricksServingEndpoint(DatabricksResource): """ Define Databricks LLM endpoint resource to serve a model. Args: endpoint_name (str): The name of all the databricks endpoints used by the model. on_behalf_of_user (Optional[bool]): If True, the resource is accessed with with the permission of the invoker of the model in the serving endpoint. If set to None or False, the resources is accesssed with the permissions of the creator """ @property def type(self) -> ResourceType: return ResourceType.SERVING_ENDPOINT def __init__(self, endpoint_name: str, on_behalf_of_user: Optional[bool] = None): super().__init__(endpoint_name, on_behalf_of_user) class DatabricksVectorSearchIndex(DatabricksResource): """ Define Databricks vector search index name resource to serve a model. Args: index_name (str): The name of the databricks vector search index used by the model on_behalf_of_user (Optional[bool]): If True, the resource is accessed with with the permission of the invoker of the model in the serving endpoint. If set to None or False, the resources is accesssed with the permissions of the creator """ @property def type(self) -> ResourceType: return ResourceType.VECTOR_SEARCH_INDEX def __init__(self, index_name: str, on_behalf_of_user: Optional[bool] = None): super().__init__(index_name, on_behalf_of_user) class DatabricksSQLWarehouse(DatabricksResource): """ Define Databricks sql warehouse resource to serve a model. Args: warehouse_id (str): The id of the sql warehouse used by the model on_behalf_of_user (Optional[bool]): If True, the resource is accessed with with the permission of the invoker of the model in the serving endpoint. If set to None or False, the resources is accesssed with the permissions of the creator """ @property def type(self) -> ResourceType: return ResourceType.SQL_WAREHOUSE def __init__(self, warehouse_id: str, on_behalf_of_user: Optional[bool] = None): super().__init__(warehouse_id, on_behalf_of_user) class DatabricksFunction(DatabricksResource): """ Define Databricks UC Function to serve a model. Args: function_name (str): The name of the function used by the model on_behalf_of_user (Optional[bool]): If True, the resource is accessed with with the permission of the invoker of the model in the serving endpoint. If set to None or False, the resources is accesssed with the permissions of the creator """ @property def type(self) -> ResourceType: return ResourceType.FUNCTION def __init__(self, function_name: str, on_behalf_of_user: Optional[bool] = None): super().__init__(function_name, on_behalf_of_user) class DatabricksGenieSpace(DatabricksResource): """ Define a Databricks Genie Space to serve a model. Args: genie_space_id (str): The genie space id on_behalf_of_user (Optional[bool]): If True, the resource is accessed with with the permission of the invoker of the model in the serving endpoint. If set to None or False, the resources is accesssed with the permissions of the creator """ @property def type(self) -> ResourceType: return ResourceType.GENIE_SPACE def __init__(self, genie_space_id: str, on_behalf_of_user: Optional[bool] = None): super().__init__(genie_space_id, on_behalf_of_user) class DatabricksTable(DatabricksResource): """ Defines a Databricks Unity Catalog (UC) Table, which establishes table dependencies for Model Serving. This table will be referenced in Agent Model Serving endpoints, where an agent queries a SQL table via either Genie or UC Functions. Args: table_name (str): The name of the table used by the model on_behalf_of_user (Optional[bool]): If True, the resource is accessed with with the permission of the invoker of the model in the serving endpoint. If set to None or False, the resources is accesssed with the permissions of the creator """ @property def type(self) -> ResourceType: return ResourceType.TABLE def __init__(self, table_name: str, on_behalf_of_user: Optional[bool] = None): super().__init__(table_name, on_behalf_of_user) def _get_resource_class_by_type(target_uri: str, resource_type: ResourceType): resource_classes = { "databricks": { ResourceType.UC_CONNECTION.value: DatabricksUCConnection, ResourceType.SERVING_ENDPOINT.value: DatabricksServingEndpoint, ResourceType.VECTOR_SEARCH_INDEX.value: DatabricksVectorSearchIndex, ResourceType.SQL_WAREHOUSE.value: DatabricksSQLWarehouse, ResourceType.FUNCTION.value: DatabricksFunction, ResourceType.GENIE_SPACE.value: DatabricksGenieSpace, ResourceType.TABLE.value: DatabricksTable, } } resource = resource_classes.get(target_uri) if resource is None: raise ValueError(f"Unsupported target URI: {target_uri}") return resource.get(resource_type) class _ResourceBuilder: """ Private builder class to build the resources dictionary. """ @staticmethod def from_resources( resources: list[Resource], api_version: str = DEFAULT_API_VERSION ) -> dict[str, dict[ResourceType, list[dict]]]: resource_dict = {} for resource in resources: resource_data = resource.to_dict() for resource_type, values in resource_data.items(): target_dict = resource_dict.setdefault(resource.target_uri, {}) target_list = target_dict.setdefault(resource_type, []) target_list.extend(values) resource_dict["api_version"] = api_version return resource_dict @staticmethod def from_dict(data) -> dict[str, dict[ResourceType, list[dict]]]: resources = [] api_version = data.pop("api_version") if api_version == "1": for target_uri, config in data.items(): for resource_type, values in config.items(): resource_class = _get_resource_class_by_type(target_uri, resource_type) if resource_class: resources.extend(resource_class.from_dict(value) for value in values) else: raise ValueError(f"Unsupported resource type: {resource_type}") else: raise ValueError(f"Unsupported API version: {api_version}") return _ResourceBuilder.from_resources(resources, api_version) @staticmethod def from_yaml_file(path: str) -> dict[str, dict[ResourceType, list[dict]]]: if not os.path.exists(path): raise OSError(f"No such file or directory: '{path}'") path = os.path.abspath(path) with open(path) as file: data = yaml.safe_load(file) return _ResourceBuilder.from_dict(data)