[1]:
from pprint import pprint
from sklearn.ensemble import RandomForestRegressor
from mlflow import MlflowClient
Initializing the MLflow Client
Depending on where you are running this notebook, your configuration may vary for how you initialize the MLflow Client in the following cell.
For this example, we’re using a locally running tracking server, but other options are available (The easiest is to use the free managed service within Databricks Community Edition).
Please see the guide to running notebooks here for more information on setting the tracking server uri and configuring access to either managed or self-managed MLflow tracking servers.
[2]:
# NOTE: review the links mentioned above for guidance on connecting to a managed tracking server, such as the free Databricks Community Edition
client = MlflowClient(tracking_uri="http://127.0.0.1:8080")
Search Experiments with the MLflow Client API
Let’s take a look at the Default Experiment that is created for us.
This safe ‘fallback’ experiment will store Runs that we create if we don’t specify a new experiment.
[3]:
# Search experiments without providing query terms behaves effectively as a 'list' action
all_experiments = client.search_experiments()
print(all_experiments)
[<Experiment: artifact_location='./mlruns/0', creation_time=None, experiment_id='0', last_update_time=None, lifecycle_stage='active', name='Default', tags={}>]
[4]:
# Extract the experiment name and lifecycle_stage
default_experiment = [
{"name": experiment.name, "lifecycle_stage": experiment.lifecycle_stage}
for experiment in all_experiments
if experiment.name == "Default"
][0]
pprint(default_experiment)
{'lifecycle_stage': 'active', 'name': 'Default'}
Creating a new Experiment
In this section, we’ll:
create a new MLflow Experiment
apply metadata in the form of Experiment Tags
[5]:
experiment_description = (
"This is the grocery forecasting project. "
"This experiment contains the produce models for apples."
)
experiment_tags = {
"project_name": "grocery-forecasting",
"store_dept": "produce",
"team": "stores-ml",
"project_quarter": "Q3-2023",
"mlflow.note.content": experiment_description,
}
produce_apples_experiment = client.create_experiment(name="Apple_Models", tags=experiment_tags)
[6]:
# Use search_experiments() to search on the project_name tag key
apples_experiment = client.search_experiments(
filter_string="tags.`project_name` = 'grocery-forecasting'"
)
pprint(apples_experiment[0])
<Experiment: artifact_location='mlflow-artifacts:/977454266300166282', creation_time=1696346036899, experiment_id='977454266300166282', last_update_time=1696346036899, lifecycle_stage='active', name='Apple_Models', tags={'mlflow.note.content': 'This is the grocery forecasting project. This '
'experiment contains the produce models for apples.',
'project_name': 'grocery-forecasting',
'project_quarter': 'Q3-2023',
'store_dept': 'produce',
'team': 'stores-ml'}>
[7]:
# Access individual tag data
print(apples_experiment[0].tags["team"])
stores-ml
Running our first model training
In this section, we’ll:
create a synthetic data set that is relevant to a simple demand forecasting task
start an MLflow run
log metrics, parameters, and tags to the run
save the model to the run
register the model during model logging
Synthetic data generator for demand of apples
Keep in mind that this is purely for demonstration purposes.
The demand value is purely artificial and is deliberately covariant with the features. This is not a particularly realistic real-world scenario (if it were, we wouldn’t need Data Scientists!).
[8]:
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
def generate_apple_sales_data_with_promo_adjustment(base_demand: int = 1000, n_rows: int = 5000):
"""
Generates a synthetic dataset for predicting apple sales demand with seasonality and inflation.
This function creates a pandas DataFrame with features relevant to apple sales.
The features include date, average_temperature, rainfall, weekend flag, holiday flag,
promotional flag, price_per_kg, and the previous day's demand. The target variable,
'demand', is generated based on a combination of these features with some added noise.
Args:
base_demand (int, optional): Base demand for apples. Defaults to 1000.
n_rows (int, optional): Number of rows (days) of data to generate. Defaults to 5000.
Returns:
pd.DataFrame: DataFrame with features and target variable for apple sales prediction.
Example:
>>> df = generate_apple_sales_data_with_seasonality(base_demand=1200, n_rows=6000)
>>> df.head()
"""
# Set seed for reproducibility
np.random.seed(9999)
# Create date range
dates = [datetime.now() - timedelta(days=i) for i in range(n_rows)]
dates.reverse()
# Generate features
df = pd.DataFrame(
{
"date": dates,
"average_temperature": np.random.uniform(10, 35, n_rows),
"rainfall": np.random.exponential(5, n_rows),
"weekend": [(date.weekday() >= 5) * 1 for date in dates],
"holiday": np.random.choice([0, 1], n_rows, p=[0.97, 0.03]),
"price_per_kg": np.random.uniform(0.5, 3, n_rows),
"month": [date.month for date in dates],
}
)
# Introduce inflation over time (years)
df["inflation_multiplier"] = 1 + (df["date"].dt.year - df["date"].dt.year.min()) * 0.03
# Incorporate seasonality due to apple harvests
df["harvest_effect"] = np.sin(2 * np.pi * (df["month"] - 3) / 12) + np.sin(
2 * np.pi * (df["month"] - 9) / 12
)
# Modify the price_per_kg based on harvest effect
df["price_per_kg"] = df["price_per_kg"] - df["harvest_effect"] * 0.5
# Adjust promo periods to coincide with periods lagging peak harvest by 1 month
peak_months = [4, 10] # months following the peak availability
df["promo"] = np.where(
df["month"].isin(peak_months),
1,
np.random.choice([0, 1], n_rows, p=[0.85, 0.15]),
)
# Generate target variable based on features
base_price_effect = -df["price_per_kg"] * 50
seasonality_effect = df["harvest_effect"] * 50
promo_effect = df["promo"] * 200
df["demand"] = (
base_demand
+ base_price_effect
+ seasonality_effect
+ promo_effect
+ df["weekend"] * 300
+ np.random.normal(0, 50, n_rows)
) * df["inflation_multiplier"] # adding random noise
# Add previous day's demand
df["previous_days_demand"] = df["demand"].shift(1)
df["previous_days_demand"].fillna(method="bfill", inplace=True) # fill the first row
# Drop temporary columns
df.drop(columns=["inflation_multiplier", "harvest_effect", "month"], inplace=True)
return df
[9]:
# Generate the dataset!
data = generate_apple_sales_data_with_promo_adjustment(base_demand=1_000, n_rows=1_000)
data[-20:]
[9]:
date | average_temperature | rainfall | weekend | holiday | price_per_kg | promo | demand | previous_days_demand | |
---|---|---|---|---|---|---|---|---|---|
980 | 2023-09-14 11:13:56.948267 | 34.130183 | 1.454065 | 0 | 0 | 1.449177 | 0 | 971.802447 | 1001.085782 |
981 | 2023-09-15 11:13:56.948267 | 32.353643 | 9.462859 | 0 | 0 | 2.856503 | 0 | 818.951553 | 971.802447 |
982 | 2023-09-16 11:13:56.948266 | 18.816833 | 0.391470 | 1 | 0 | 1.326429 | 0 | 1281.352029 | 818.951553 |
983 | 2023-09-17 11:13:56.948265 | 34.533012 | 2.120477 | 1 | 0 | 0.970131 | 0 | 1357.385504 | 1281.352029 |
984 | 2023-09-18 11:13:56.948265 | 23.057202 | 2.365705 | 0 | 0 | 1.049931 | 0 | 991.427049 | 1357.385504 |
985 | 2023-09-19 11:13:56.948264 | 34.810165 | 3.089005 | 0 | 0 | 2.035149 | 0 | 974.971149 | 991.427049 |
986 | 2023-09-20 11:13:56.948263 | 29.208905 | 3.673292 | 0 | 0 | 2.518098 | 0 | 1056.249547 | 974.971149 |
987 | 2023-09-21 11:13:56.948263 | 16.428676 | 4.077782 | 0 | 0 | 1.268979 | 0 | 1063.118915 | 1056.249547 |
988 | 2023-09-22 11:13:56.948262 | 32.067512 | 2.734454 | 0 | 0 | 0.762317 | 0 | 1040.492007 | 1063.118915 |
989 | 2023-09-23 11:13:56.948261 | 31.938203 | 13.883486 | 1 | 0 | 1.153301 | 0 | 1285.040470 | 1040.492007 |
990 | 2023-09-24 11:13:56.948261 | 18.024055 | 7.544061 | 1 | 0 | 0.610703 | 0 | 1366.644564 | 1285.040470 |
991 | 2023-09-25 11:13:56.948260 | 20.681067 | 18.820490 | 0 | 0 | 1.533488 | 0 | 973.934924 | 1366.644564 |
992 | 2023-09-26 11:13:56.948259 | 16.010132 | 7.705941 | 0 | 0 | 1.632498 | 1 | 1188.291256 | 973.934924 |
993 | 2023-09-27 11:13:56.948259 | 18.766455 | 6.274840 | 0 | 0 | 2.806554 | 0 | 930.089438 | 1188.291256 |
994 | 2023-09-28 11:13:56.948258 | 27.948793 | 23.705246 | 0 | 0 | 0.829464 | 0 | 1060.576311 | 930.089438 |
995 | 2023-09-29 11:13:56.948257 | 28.661072 | 10.329865 | 0 | 0 | 2.290591 | 0 | 910.690776 | 1060.576311 |
996 | 2023-09-30 11:13:56.948256 | 10.821693 | 3.575645 | 1 | 0 | 0.897473 | 0 | 1306.363801 | 910.690776 |
997 | 2023-10-01 11:13:56.948256 | 21.108560 | 6.221089 | 1 | 0 | 1.093864 | 1 | 1564.422372 | 1306.363801 |
998 | 2023-10-02 11:13:56.948254 | 29.451301 | 5.021463 | 0 | 0 | 2.493085 | 1 | 1164.303256 | 1564.422372 |
999 | 2023-10-03 11:13:56.948248 | 19.261458 | 0.438381 | 0 | 0 | 2.610422 | 1 | 1067.963448 | 1164.303256 |
Train and log the model
We’re now ready to import our model class and train a RandomForestRegressor
[10]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
import mlflow
# Use the fluent API to set the tracking uri and the active experiment
mlflow.set_tracking_uri("http://127.0.0.1:8080")
# Sets the current active experiment to the "Apple_Models" experiment and returns the Experiment metadata
apple_experiment = mlflow.set_experiment("Apple_Models")
# Define a run name for this iteration of training.
# If this is not set, a unique name will be auto-generated for your run.
run_name = "apples_rf_test"
# Define an artifact path that the model will be saved to.
artifact_path = "rf_apples"
[11]:
# Split the data into features and target and drop irrelevant date field and target field
X = data.drop(columns=["date", "demand"])
y = data["demand"]
# Split the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
params = {
"n_estimators": 100,
"max_depth": 6,
"min_samples_split": 10,
"min_samples_leaf": 4,
"bootstrap": True,
"oob_score": False,
"random_state": 888,
}
# Train the RandomForestRegressor
rf = RandomForestRegressor(**params)
# Fit the model on the training data
rf.fit(X_train, y_train)
# Predict on the validation set
y_pred = rf.predict(X_val)
# Calculate error metrics
mae = mean_absolute_error(y_val, y_pred)
mse = mean_squared_error(y_val, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_val, y_pred)
# Assemble the metrics we're going to write into a collection
metrics = {"mae": mae, "mse": mse, "rmse": rmse, "r2": r2}
# Initiate the MLflow run context
with mlflow.start_run(run_name=run_name) as run:
# Log the parameters used for the model fit
mlflow.log_params(params)
# Log the error metrics that were calculated during validation
mlflow.log_metrics(metrics)
# Log an instance of the trained model for later use
mlflow.sklearn.log_model(sk_model=rf, input_example=X_val, artifact_path=artifact_path)
/Users/benjamin.wilson/miniconda3/envs/mlflow-dev-env/lib/python3.8/site-packages/mlflow/models/signature.py:333: UserWarning: Hint: Inferred schema contains integer column(s). Integer columns in Python cannot represent missing values. If your input data contains missing values at inference time, it will be encoded as floats and will cause a schema enforcement error. The best way to avoid this problem is to infer the model schema based on a realistic data sample (training dataset) that includes missing values. Alternatively, you can declare integer columns as doubles (float64) whenever these columns may have missing values. See `Handling Integers With Missing Values <https://www.mlflow.org/docs/latest/models.html#handling-integers-with-missing-values>`_ for more details.
input_schema = _infer_schema(input_ex)
/Users/benjamin.wilson/miniconda3/envs/mlflow-dev-env/lib/python3.8/site-packages/_distutils_hack/__init__.py:30: UserWarning: Setuptools is replacing distutils.
warnings.warn("Setuptools is replacing distutils.")