import json
import logging
import os
import pathlib
from enum import Enum
from pathlib import Path
from typing import Any, Optional, Union
import pydantic
import yaml
from packaging.version import Version
from pydantic import ConfigDict, Field, ValidationError, root_validator, validator
from pydantic.json import pydantic_encoder
from mlflow.exceptions import MlflowException
from mlflow.gateway.base_models import ConfigModel, LimitModel, ResponseModel
from mlflow.gateway.constants import (
MLFLOW_AI_GATEWAY_MOSAICML_CHAT_SUPPORTED_MODEL_PREFIXES,
MLFLOW_GATEWAY_ROUTE_BASE,
MLFLOW_QUERY_SUFFIX,
)
from mlflow.gateway.utils import (
check_configuration_deprecated_fields,
check_configuration_route_name_collisions,
is_valid_ai21labs_model,
is_valid_endpoint_name,
is_valid_mosiacml_chat_model,
)
from mlflow.utils import IS_PYDANTIC_V2_OR_NEWER
_logger = logging.getLogger(__name__)
if IS_PYDANTIC_V2_OR_NEWER:
from pydantic import SerializeAsAny
[docs]class Provider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
COHERE = "cohere"
AI21LABS = "ai21labs"
MLFLOW_MODEL_SERVING = "mlflow-model-serving"
MOSAICML = "mosaicml"
HUGGINGFACE_TEXT_GENERATION_INFERENCE = "huggingface-text-generation-inference"
PALM = "palm"
BEDROCK = "bedrock"
AMAZON_BEDROCK = "amazon-bedrock" # an alias for bedrock
# Note: The following providers are only supported on Databricks
DATABRICKS_MODEL_SERVING = "databricks-model-serving"
DATABRICKS = "databricks"
MISTRAL = "mistral"
TOGETHERAI = "togetherai"
[docs] @classmethod
def values(cls):
return {p.value for p in cls}
[docs]class TogetherAIConfig(ConfigModel):
togetherai_api_key: str
[docs] @validator("togetherai_api_key", pre=True)
def validate_togetherai_api_key(cls, value):
return _resolve_api_key_from_input(value)
[docs]class RouteType(str, Enum):
LLM_V1_COMPLETIONS = "llm/v1/completions"
LLM_V1_CHAT = "llm/v1/chat"
LLM_V1_EMBEDDINGS = "llm/v1/embeddings"
[docs]class CohereConfig(ConfigModel):
cohere_api_key: str
[docs] @validator("cohere_api_key", pre=True)
def validate_cohere_api_key(cls, value):
return _resolve_api_key_from_input(value)
[docs]class AI21LabsConfig(ConfigModel):
ai21labs_api_key: str
[docs] @validator("ai21labs_api_key", pre=True)
def validate_ai21labs_api_key(cls, value):
return _resolve_api_key_from_input(value)
[docs]class MosaicMLConfig(ConfigModel):
mosaicml_api_key: str
mosaicml_api_base: Optional[str] = None
[docs] @validator("mosaicml_api_key", pre=True)
def validate_mosaicml_api_key(cls, value):
return _resolve_api_key_from_input(value)
[docs]class OpenAIAPIType(str, Enum):
OPENAI = "openai"
AZURE = "azure"
AZUREAD = "azuread"
@classmethod
def _missing_(cls, value):
"""
Implements case-insensitive matching of API type strings
"""
for api_type in cls:
if api_type.value == value.lower():
return api_type
raise MlflowException.invalid_parameter_value(f"Invalid OpenAI API type '{value}'")
[docs]class OpenAIConfig(ConfigModel):
openai_api_key: str
openai_api_type: OpenAIAPIType = OpenAIAPIType.OPENAI
openai_api_base: Optional[str] = None
openai_api_version: Optional[str] = None
openai_deployment_name: Optional[str] = None
openai_organization: Optional[str] = None
[docs] @validator("openai_api_key", pre=True)
def validate_openai_api_key(cls, value):
return _resolve_api_key_from_input(value)
@classmethod
def _validate_field_compatibility(cls, info: dict[str, Any]):
if not isinstance(info, dict):
return info
api_type = (info.get("openai_api_type") or OpenAIAPIType.OPENAI).lower()
if api_type == OpenAIAPIType.OPENAI:
if info.get("openai_deployment_name") is not None:
raise MlflowException.invalid_parameter_value(
f"OpenAI route configuration can only specify a value for "
f"'openai_deployment_name' if 'openai_api_type' is '{OpenAIAPIType.AZURE}' "
f"or '{OpenAIAPIType.AZUREAD}'. Found type: '{api_type}'"
)
if info.get("openai_api_base") is None:
info["openai_api_base"] = "https://api.openai.com/v1"
elif api_type in (OpenAIAPIType.AZURE, OpenAIAPIType.AZUREAD):
if info.get("openai_organization") is not None:
raise MlflowException.invalid_parameter_value(
f"OpenAI route configuration can only specify a value for "
f"'openai_organization' if 'openai_api_type' is '{OpenAIAPIType.OPENAI}'"
)
base_url = info.get("openai_api_base")
deployment_name = info.get("openai_deployment_name")
api_version = info.get("openai_api_version")
if (base_url, deployment_name, api_version).count(None) > 0:
raise MlflowException.invalid_parameter_value(
f"OpenAI route configuration must specify 'openai_api_base', "
f"'openai_deployment_name', and 'openai_api_version' if 'openai_api_type' is "
f"'{OpenAIAPIType.AZURE}' or '{OpenAIAPIType.AZUREAD}'."
)
else:
raise MlflowException.invalid_parameter_value(f"Invalid OpenAI API type '{api_type}'")
return info
if IS_PYDANTIC_V2_OR_NEWER:
from pydantic import model_validator as _model_validator
@_model_validator(mode="before")
def validate_field_compatibility(cls, info: dict[str, Any]):
return cls._validate_field_compatibility(info)
else:
from pydantic import root_validator as _root_validator
[docs] @_root_validator(pre=False)
def validate_field_compatibility(cls, config: dict[str, Any]):
return cls._validate_field_compatibility(config)
[docs]class AnthropicConfig(ConfigModel):
anthropic_api_key: str
anthropic_version: str = "2023-06-01"
[docs] @validator("anthropic_api_key", pre=True)
def validate_anthropic_api_key(cls, value):
return _resolve_api_key_from_input(value)
[docs]class PaLMConfig(ConfigModel):
palm_api_key: str
[docs] @validator("palm_api_key", pre=True)
def validate_palm_api_key(cls, value):
return _resolve_api_key_from_input(value)
[docs]class MlflowModelServingConfig(ConfigModel):
model_server_url: str
# Workaround to suppress warning that Pydantic raises when a field name starts with "model_".
# https://github.com/mlflow/mlflow/issues/10335
model_config = pydantic.ConfigDict(protected_namespaces=())
[docs]class HuggingFaceTextGenerationInferenceConfig(ConfigModel):
hf_server_url: str
[docs]class AWSBaseConfig(pydantic.BaseModel):
aws_region: Optional[str] = None
[docs]class AWSRole(AWSBaseConfig):
aws_role_arn: str
session_length_seconds: int = 15 * 60
[docs]class AWSIdAndKey(AWSBaseConfig):
aws_access_key_id: str
aws_secret_access_key: str
aws_session_token: Optional[str] = None
[docs]class AmazonBedrockConfig(ConfigModel):
# order here is important, at least for pydantic<2
aws_config: Union[AWSRole, AWSIdAndKey, AWSBaseConfig]
[docs]class MistralConfig(ConfigModel):
mistral_api_key: str
[docs] @validator("mistral_api_key", pre=True)
def validate_mistral_api_key(cls, value):
return _resolve_api_key_from_input(value)
[docs]class ModelInfo(ResponseModel):
name: Optional[str] = None
provider: Provider
def _resolve_api_key_from_input(api_key_input):
"""
Resolves the provided API key.
Input formats accepted:
- Path to a file as a string which will have the key loaded from it
- environment variable name that stores the api key
- the api key itself
"""
if not isinstance(api_key_input, str):
raise MlflowException.invalid_parameter_value(
"The api key provided is not a string. Please provide either an environment "
"variable key, a path to a file containing the api key, or the api key itself"
)
# try reading as an environment variable
if api_key_input.startswith("$"):
env_var_name = api_key_input[1:]
if env_var := os.getenv(env_var_name):
return env_var
else:
raise MlflowException.invalid_parameter_value(
f"Environment variable {env_var_name!r} is not set"
)
# try reading from a local path
file = pathlib.Path(api_key_input)
if file.is_file():
return file.read_text()
# if the key itself is passed, return
return api_key_input
[docs]class Model(ConfigModel):
name: Optional[str] = None
provider: Union[str, Provider]
if IS_PYDANTIC_V2_OR_NEWER:
config: Optional[SerializeAsAny[ConfigModel]] = None
else:
config: Optional[ConfigModel] = None
[docs] @validator("provider", pre=True)
def validate_provider(cls, value):
from mlflow.gateway.provider_registry import provider_registry
if isinstance(value, Provider):
return value
formatted_value = value.replace("-", "_").upper()
if formatted_value in Provider.__members__:
return Provider[formatted_value]
if value in provider_registry.keys():
return value
raise MlflowException.invalid_parameter_value(f"The provider '{value}' is not supported.")
@classmethod
def _validate_config(cls, info, values):
from mlflow.gateway.provider_registry import provider_registry
if provider := values.get("provider"):
config_type = provider_registry.get(provider).CONFIG_TYPE
return config_type(**info)
raise MlflowException.invalid_parameter_value(
"A provider must be provided for each gateway route."
)
if IS_PYDANTIC_V2_OR_NEWER:
@validator("config", pre=True)
def validate_config(cls, info, values):
return cls._validate_config(info, values)
else:
[docs] @validator("config", pre=True)
def validate_config(cls, config, values):
return cls._validate_config(config, values)
[docs]class AliasedConfigModel(ConfigModel):
"""
Enables use of field aliases in a configuration model for backwards compatibility
"""
if Version(pydantic.__version__) >= Version("2.0"):
model_config = ConfigDict(populate_by_name=True)
else:
class Config:
allow_population_by_field_name = True
[docs]class Limit(LimitModel):
calls: int
key: Optional[str] = None
renewal_period: str
[docs]class LimitsConfig(ConfigModel):
limits: Optional[list[Limit]] = []
[docs]class RouteConfig(AliasedConfigModel):
name: str
route_type: RouteType = Field(alias="endpoint_type")
model: Model
limit: Optional[Limit] = None
[docs] @validator("name")
def validate_endpoint_name(cls, route_name):
if not is_valid_endpoint_name(route_name):
raise MlflowException.invalid_parameter_value(
"The route name provided contains disallowed characters for a url endpoint. "
f"'{route_name}' is invalid. Names cannot contain spaces or any non "
"alphanumeric characters other than hyphen and underscore."
)
return route_name
[docs] @validator("model", pre=True)
def validate_model(cls, model):
if model:
model_instance = Model(**model)
if model_instance.provider in Provider.values() and model_instance.config is None:
raise MlflowException.invalid_parameter_value(
"A config must be supplied when setting a provider. The provider entry for "
f"{model_instance.provider} is incorrect."
)
return model
[docs] @root_validator(skip_on_failure=True)
def validate_route_type_and_model_name(cls, values):
route_type = values.get("route_type")
model = values.get("model")
if (
model
and model.provider == "mosaicml"
and route_type == RouteType.LLM_V1_CHAT
and not is_valid_mosiacml_chat_model(model.name)
):
raise MlflowException.invalid_parameter_value(
f"An invalid model has been specified for the chat route. '{model.name}'. "
f"Ensure the model selected starts with one of: "
f"{MLFLOW_AI_GATEWAY_MOSAICML_CHAT_SUPPORTED_MODEL_PREFIXES}"
)
if model and model.provider == "ai21labs" and not is_valid_ai21labs_model(model.name):
raise MlflowException.invalid_parameter_value(
f"An Unsupported AI21Labs model has been specified: '{model.name}'. "
f"Please see documentation for supported models."
)
return values
[docs] @validator("route_type", pre=True)
def validate_route_type(cls, value):
if value in RouteType._value2member_map_:
return value
raise MlflowException.invalid_parameter_value(f"The route_type '{value}' is not supported.")
[docs] @validator("limit", pre=True)
def validate_limit(cls, value):
from limits import parse
if value:
limit = Limit(**value)
try:
parse(f"{limit.calls}/{limit.renewal_period}")
except ValueError:
raise MlflowException.invalid_parameter_value(
"Failed to parse the rate limit configuration."
"Please make sure limit.calls is a positive number and"
"limit.renewal_period is a right granularity"
)
return value
[docs] def to_route(self) -> "Route":
return Route(
name=self.name,
route_type=self.route_type,
model=RouteModelInfo(
name=self.model.name,
provider=self.model.provider,
),
route_url=f"{MLFLOW_GATEWAY_ROUTE_BASE}{self.name}{MLFLOW_QUERY_SUFFIX}",
limit=self.limit,
)
[docs]class RouteModelInfo(ResponseModel):
name: Optional[str] = None
# Use `str` instead of `Provider` enum to allow gateway backends such as Databricks to
# support new providers without breaking the gateway client.
provider: str
_ROUTE_EXTRA_SCHEMA = {
"example": {
"name": "openai-completions",
"route_type": "llm/v1/completions",
"model": {
"name": "gpt-4o-mini",
"provider": "openai",
},
"route_url": "/gateway/routes/completions/invocations",
}
}
[docs]class Route(ConfigModel):
name: str
route_type: str
model: RouteModelInfo
route_url: str
limit: Optional[Limit] = None
[docs] class Config:
if IS_PYDANTIC_V2_OR_NEWER:
json_schema_extra = _ROUTE_EXTRA_SCHEMA
else:
schema_extra = _ROUTE_EXTRA_SCHEMA
[docs] def to_endpoint(self):
from mlflow.deployments.server.config import Endpoint
return Endpoint(
name=self.name,
endpoint_type=self.route_type,
model=self.model,
endpoint_url=self.route_url,
limit=self.limit,
)
[docs]class GatewayConfig(AliasedConfigModel):
routes: list[RouteConfig] = Field(alias="endpoints")
def _load_route_config(path: Union[str, Path]) -> GatewayConfig:
"""
Reads the gateway configuration yaml file from the storage location and returns an instance
of the configuration RouteConfig class
"""
if isinstance(path, str):
path = Path(path)
try:
configuration = yaml.safe_load(path.read_text())
except Exception as e:
raise MlflowException.invalid_parameter_value(
f"The file at {path} is not a valid yaml file"
) from e
check_configuration_deprecated_fields(configuration)
check_configuration_route_name_collisions(configuration)
try:
return GatewayConfig(**configuration)
except ValidationError as e:
raise MlflowException.invalid_parameter_value(
f"The gateway configuration is invalid: {e}"
) from e
def _save_route_config(config: GatewayConfig, path: Union[str, Path]) -> None:
if isinstance(path, str):
path = Path(path)
path.write_text(yaml.safe_dump(json.loads(json.dumps(config.dict(), default=pydantic_encoder))))
def _validate_config(config_path: str) -> GatewayConfig:
if not os.path.exists(config_path):
raise MlflowException.invalid_parameter_value(f"{config_path} does not exist")
try:
return _load_route_config(config_path)
except ValidationError as e:
raise MlflowException.invalid_parameter_value(f"Invalid gateway configuration: {e}") from e