Skip to main content

Prophet with MLflow

In this comprehensive guide, we'll explore how to use Prophet with MLflow for time series forecasting, experiment tracking, and model deployment. We'll cover everything from basic forecasting workflows to advanced business scenario modeling and production deployment patterns.

Quick Start with Prophet and MLflow​

Prophet works seamlessly with MLflow to track your forecasting experiments:

import mlflow
import mlflow.prophet
import pandas as pd
import numpy as np
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics

# Load sample time series data (Prophet expects 'ds' and 'y' columns)
# This example uses the classic Peyton Manning Wikipedia page views dataset
url = "https://raw.githubusercontent.com/facebook/prophet/main/examples/example_wp_log_peyton_manning.csv"
df = pd.read_csv(url)

print(f"Data shape: {df.shape}")
print(f"Date range: {df['ds'].min()} to {df['ds'].max()}")
print(f"Data preview:\n{df.head()}")

with mlflow.start_run(run_name="Basic Prophet Forecast"):
# Create Prophet model with specific parameters
model = Prophet(
changepoint_prior_scale=0.05, # Flexibility of trend changes
seasonality_prior_scale=10, # Strength of seasonality
holidays_prior_scale=10, # Strength of holiday effects
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
)

# Fit the model
model.fit(df)

# Extract and log model parameters
def extract_prophet_params(prophet_model):
"""Extract Prophet model parameters for logging."""
from prophet.serialize import SIMPLE_ATTRIBUTES

params = {}
for attr in SIMPLE_ATTRIBUTES:
if hasattr(prophet_model, attr):
value = getattr(prophet_model, attr)
if isinstance(value, (int, float, str, bool)):
params[attr] = value
return params

params = extract_prophet_params(model)
mlflow.log_params(params)

# Create future dataframe for forecasting
future = model.make_future_dataframe(periods=365) # Forecast 1 year ahead
forecast = model.predict(future)

# Cross-validation for model evaluation
cv_results = cross_validation(
model,
initial="730 days", # Initial training period
period="180 days", # Spacing between cutoff dates
horizon="365 days", # Forecast horizon
parallel="threads", # Use threading for speed
)

# Calculate performance metrics
metrics = performance_metrics(cv_results)
avg_metrics = metrics[["mse", "rmse", "mae", "mape"]].mean().to_dict()
mlflow.log_metrics(avg_metrics)

# Log the model with input example
mlflow.prophet.log_model(
pr_model=model, name="prophet_model", input_example=df[["ds"]].head(10)
)

print(f"Model trained and logged successfully!")
print(f"Average MAPE: {avg_metrics['mape']:.2f}%")

This example automatically captures:

  • All Prophet model parameters and configuration
  • Cross-validation performance metrics
  • The trained model ready for deployment
  • Sample input data for model documentation

Understanding Prophet's Data Requirements​

Prophet has specific data format requirements that are important to understand:

Data Format and Preparation​

import pandas as pd
from datetime import datetime, timedelta


def prepare_prophet_data(data, date_col, value_col, freq="D"):
"""
Prepare data for Prophet training.

Args:
data: DataFrame with time series data
date_col: Name of date column
value_col: Name of value column
freq: Frequency of the time series
"""

# Prophet requires columns named 'ds' (datestamp) and 'y' (value)
prophet_df = data[[date_col, value_col]].copy()
prophet_df.columns = ["ds", "y"]

# Ensure ds is datetime
prophet_df["ds"] = pd.to_datetime(prophet_df["ds"])

# Sort by date
prophet_df = prophet_df.sort_values("ds").reset_index(drop=True)

# Handle missing dates if needed
if freq:
full_date_range = pd.date_range(
start=prophet_df["ds"].min(), end=prophet_df["ds"].max(), freq=freq
)

# Reindex to fill missing dates
prophet_df = prophet_df.set_index("ds").reindex(full_date_range).reset_index()
prophet_df.columns = ["ds", "y"]

# Log data quality metrics
missing_dates = prophet_df["y"].isna().sum()
print(f"Missing dates filled: {missing_dates}")

return prophet_df


# Example usage
# Assuming you have a DataFrame with 'date' and 'sales' columns
# df_prepared = prepare_prophet_data(raw_data, 'date', 'sales', freq='D')

Handling Different Time Series Patterns​

Data Preparation Patterns

Multiple Time Series​

def prepare_multiple_series(data, date_col, value_col, series_col):
"""Prepare multiple time series for separate Prophet models."""

results = {}

for series_name in data[series_col].unique():
series_data = data[data[series_col] == series_name]
prophet_data = prepare_prophet_data(series_data, date_col, value_col)
results[series_name] = prophet_data

return results


# Train separate models for each series
def train_multiple_prophet_models(series_dict):
"""Train Prophet models for multiple time series."""

models = {}

with mlflow.start_run(run_name="Multiple Series Forecasting"):
for series_name, data in series_dict.items():
with mlflow.start_run(run_name=f"Series_{series_name}", nested=True):
model = Prophet()
model.fit(data)

# Log series-specific metrics
mlflow.log_param("series_name", series_name)
mlflow.log_param("data_points", len(data))

models[series_name] = model

# Log individual model
mlflow.prophet.log_model(pr_model=model, name=f"model_{series_name}")

return models

Irregular Time Series​

def handle_irregular_timeseries(df, min_frequency="W"):
"""Handle irregular time series data."""

# Aggregate to regular frequency if needed
df["ds"] = pd.to_datetime(df["ds"])
df.set_index("ds", inplace=True)

# Resample to regular frequency
if min_frequency == "W":
df_regular = (
df.resample("W")
.agg(
{
"y": "sum", # or 'mean' depending on your use case
}
)
.reset_index()
)
elif min_frequency == "M":
df_regular = (
df.resample("M")
.agg(
{
"y": "sum",
}
)
.reset_index()
)

# Remove any remaining NaN values
df_regular = df_regular.dropna()

return df_regular

Advanced Prophet Configuration​

Seasonality and Trend Configuration​

def advanced_prophet_configuration():
"""Demonstrate advanced Prophet configuration options."""

with mlflow.start_run(run_name="Advanced Prophet Configuration"):
# Create Prophet model with advanced settings
model = Prophet(
# Trend configuration
growth="logistic", # or 'linear'
changepoints=None, # Let Prophet auto-detect, or specify dates
n_changepoints=25, # Number of potential changepoints
changepoint_range=0.8, # Proportion of history for changepoints
changepoint_prior_scale=0.05, # Flexibility of trend changes
# Seasonality configuration
yearly_seasonality="auto", # or True/False/number
weekly_seasonality="auto",
daily_seasonality="auto",
seasonality_mode="additive", # or 'multiplicative'
seasonality_prior_scale=10,
# Holiday configuration
holidays_prior_scale=10,
# Uncertainty configuration
interval_width=0.80, # Width of uncertainty intervals
uncertainty_samples=1000, # Monte Carlo samples for uncertainty
# Stan configuration
mcmc_samples=0, # Use MAP instead of MCMC
stan_backend="CMDSTANPY", # Stan backend
)

# For logistic growth, need to specify capacity
if model.growth == "logistic":
df["cap"] = df["y"].max() * 1.2 # Set capacity 20% above max observed
df["floor"] = 0 # Optional floor

# Fit the model
model.fit(df)

# Log configuration parameters
config_params = {
"growth": model.growth,
"n_changepoints": model.n_changepoints,
"changepoint_range": model.changepoint_range,
"seasonality_mode": model.seasonality_mode,
"interval_width": model.interval_width,
}
mlflow.log_params(config_params)

return model


# Usage
advanced_model = advanced_prophet_configuration()

Custom Seasonalities and Events​

def add_custom_components(model, df):
"""Add custom seasonalities and regressors to Prophet model."""

with mlflow.start_run(run_name="Custom Prophet Components"):
# Add custom seasonalities
model.add_seasonality(
name="monthly",
period=30.5, # Monthly seasonality
fourier_order=5, # Number of Fourier terms
)

model.add_seasonality(
name="quarterly", period=91.25, fourier_order=8 # Quarterly seasonality
)

# Add conditional seasonalities (e.g., different patterns for weekdays/weekends)
def is_weekend(ds):
date = pd.to_datetime(ds)
return date.weekday() >= 5

df["weekend"] = df["ds"].apply(is_weekend)

model.add_seasonality(
name="weekend_seasonality",
period=7,
fourier_order=3,
condition_name="weekend",
)

# Add external regressors
# Example: Add economic indicator or marketing spend
np.random.seed(42)
df["marketing_spend"] = np.random.normal(1000, 200, len(df))
df["economic_indicator"] = np.random.normal(50, 10, len(df))

model.add_regressor("marketing_spend", prior_scale=0.5)
model.add_regressor("economic_indicator", prior_scale=0.3)

# Fit model with custom components
model.fit(df)

# Log custom component information
mlflow.log_params(
{
"custom_seasonalities": ["monthly", "quarterly", "weekend_seasonality"],
"external_regressors": ["marketing_spend", "economic_indicator"],
"total_components": len(model.extra_seasonalities)
+ len(model.extra_regressors),
}
)

return model, df


# Usage
model_with_custom = Prophet()
model_with_custom, enhanced_df = add_custom_components(model_with_custom, df.copy())
Holiday and Event Modeling

Built-in Holiday Support​

from prophet.make_holidays import make_holidays_df


def add_holiday_effects():
"""Add holiday effects to Prophet model."""

with mlflow.start_run(run_name="Holiday Modeling"):
# Create holidays dataframe for specific countries
holidays = make_holidays_df(
year_list=range(2010, 2025),
country="US", # Built-in support for many countries
)

# Add custom holidays/events
custom_events = pd.DataFrame(
{
"holiday": "black_friday",
"ds": pd.to_datetime(
["2020-11-27", "2021-11-26", "2022-11-25", "2023-11-24"]
),
"lower_window": -1, # Effect starts 1 day before
"upper_window": 2, # Effect lasts 2 days after
}
)

# Combine built-in and custom holidays
all_holidays = pd.concat([holidays, custom_events])

# Create model with holidays
model = Prophet(
holidays=all_holidays,
holidays_prior_scale=15, # Increase for stronger holiday effects
)

model.fit(df)

# Log holiday information
mlflow.log_params(
{
"country_holidays": "US",
"custom_events": ["black_friday"],
"total_holidays": len(all_holidays),
"holidays_prior_scale": 15,
}
)

return model

Business Calendar Integration​

def create_business_calendar(start_date, end_date):
"""Create business-specific calendar events."""

business_events = []

# Quarterly business reviews
for year in range(start_date.year, end_date.year + 1):
for quarter in [1, 2, 3, 4]:
if quarter == 1:
date = f"{year}-03-31"
elif quarter == 2:
date = f"{year}-06-30"
elif quarter == 3:
date = f"{year}-09-30"
else:
date = f"{year}-12-31"

business_events.append(
{
"holiday": "quarterly_review",
"ds": pd.to_datetime(date),
"lower_window": -7, # Week before
"upper_window": 0,
}
)

# Annual planning periods
for year in range(start_date.year, end_date.year + 1):
business_events.append(
{
"holiday": "annual_planning",
"ds": pd.to_datetime(f"{year}-11-15"),
"lower_window": -14, # Two weeks of planning
"upper_window": 14,
}
)

return pd.DataFrame(business_events)

Model Validation and Performance Assessment​

Cross-Validation Best Practices​

def comprehensive_model_validation(model, df):
"""Perform comprehensive Prophet model validation."""

with mlflow.start_run(run_name="Comprehensive Model Validation"):
# Multiple cross-validation configurations
cv_configs = [
{
"name": "short_horizon",
"initial": "365 days",
"period": "90 days",
"horizon": "90 days",
},
{
"name": "medium_horizon",
"initial": "730 days",
"period": "180 days",
"horizon": "180 days",
},
{
"name": "long_horizon",
"initial": "1095 days",
"period": "180 days",
"horizon": "365 days",
},
]

all_metrics = {}

for config in cv_configs:
try:
# Perform cross-validation
cv_results = cross_validation(
model,
initial=config["initial"],
period=config["period"],
horizon=config["horizon"],
parallel="threads",
)

# Calculate metrics
metrics = performance_metrics(cv_results)
avg_metrics = metrics[["mse", "rmse", "mae", "mape", "coverage"]].mean()

# Store metrics with configuration prefix
for metric, value in avg_metrics.items():
metric_name = f"{config['name']}_{metric}"
all_metrics[metric_name] = value
mlflow.log_metric(metric_name, value)

# Log additional statistics
mlflow.log_metrics(
{
f"{config['name']}_cv_folds": len(cv_results),
f"{config['name']}_mape_std": metrics["mape"].std(),
}
)

except Exception as e:
print(f"Cross-validation failed for {config['name']}: {e}")
mlflow.log_param(f"{config['name']}_error", str(e))

return all_metrics


# Usage
validation_metrics = comprehensive_model_validation(model, df)

Forecast Quality Assessment​

import matplotlib.pyplot as plt
import seaborn as sns


def analyze_forecast_quality(model, df):
"""Analyze forecast quality with visualizations."""

with mlflow.start_run(run_name="Forecast Quality Analysis"):
# Generate forecast
future = model.make_future_dataframe(periods=365)
if model.growth == "logistic":
future["cap"] = df["cap"].iloc[-1] # Use last known capacity
future["floor"] = df["floor"].iloc[-1] if "floor" in df.columns else 0

forecast = model.predict(future)

# Component analysis
fig = model.plot_components(forecast, figsize=(12, 8))
plt.tight_layout()
plt.savefig("forecast_components.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("forecast_components.png")
plt.close()

# Forecast plot
fig = model.plot(forecast, figsize=(12, 6))
plt.title("Prophet Forecast")
plt.tight_layout()
plt.savefig("forecast_plot.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("forecast_plot.png")
plt.close()

# Residual analysis
# Get historical predictions
historical_forecast = forecast[forecast["ds"] <= df["ds"].max()]
residuals = (
df.set_index("ds")["y"] - historical_forecast.set_index("ds")["yhat"]
)

# Plot residuals
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Residuals over time
axes[0, 0].plot(residuals.index, residuals.values)
axes[0, 0].set_title("Residuals Over Time")
axes[0, 0].set_xlabel("Date")
axes[0, 0].set_ylabel("Residual")

# Residual distribution
axes[0, 1].hist(residuals.values, bins=30, alpha=0.7)
axes[0, 1].set_title("Residual Distribution")
axes[0, 1].set_xlabel("Residual")
axes[0, 1].set_ylabel("Frequency")

# Q-Q plot
from scipy import stats

stats.probplot(residuals.values, dist="norm", plot=axes[1, 0])
axes[1, 0].set_title("Q-Q Plot")

# Residuals vs fitted
axes[1, 1].scatter(historical_forecast["yhat"], residuals.values, alpha=0.6)
axes[1, 1].set_title("Residuals vs Fitted")
axes[1, 1].set_xlabel("Fitted Values")
axes[1, 1].set_ylabel("Residuals")

plt.tight_layout()
plt.savefig("residual_analysis.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("residual_analysis.png")
plt.close()

# Calculate residual statistics
residual_stats = {
"residual_mean": residuals.mean(),
"residual_std": residuals.std(),
"residual_skewness": stats.skew(residuals.values),
"residual_kurtosis": stats.kurtosis(residuals.values),
"ljung_box_pvalue": stats.diagnostic.acorr_ljungbox(
residuals.values, lags=10, return_df=True
)["lb_pvalue"].iloc[-1],
}

mlflow.log_metrics(residual_stats)

return forecast, residual_stats


# Usage
forecast_analysis, residual_stats = analyze_forecast_quality(model, df)

Hyperparameter Optimization​

Systematic Parameter Tuning​

import itertools
from sklearn.metrics import mean_absolute_percentage_error


def optimize_prophet_hyperparameters(df, param_grid=None):
"""Systematic hyperparameter optimization for Prophet."""

if param_grid is None:
param_grid = {
"changepoint_prior_scale": [0.001, 0.01, 0.1, 0.5],
"seasonality_prior_scale": [0.01, 0.1, 1.0, 10.0],
"holidays_prior_scale": [0.01, 0.1, 1.0, 10.0],
"seasonality_mode": ["additive", "multiplicative"],
}

# Generate all parameter combinations
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
param_combinations = list(itertools.product(*param_values))

results = []

with mlflow.start_run(run_name="Prophet Hyperparameter Optimization"):
mlflow.log_param("total_combinations", len(param_combinations))

for i, param_combo in enumerate(param_combinations):
param_dict = dict(zip(param_names, param_combo))

with mlflow.start_run(run_name=f"Config_{i+1}", nested=True):
try:
# Create model with current parameters
model = Prophet(**param_dict)

# Time series split for validation
train_size = int(len(df) * 0.8)
train_df = df.iloc[:train_size]
test_df = df.iloc[train_size:]

# Fit model
model.fit(train_df)

# Predict on test set
future = model.make_future_dataframe(periods=len(test_df))
if model.growth == "logistic":
future["cap"] = df["cap"].iloc[-1]

forecast = model.predict(future)
test_forecast = forecast.iloc[-len(test_df) :]

# Calculate metrics
mape = mean_absolute_percentage_error(
test_df["y"], test_forecast["yhat"]
)
mae = np.mean(np.abs(test_df["y"] - test_forecast["yhat"]))
rmse = np.sqrt(np.mean((test_df["y"] - test_forecast["yhat"]) ** 2))

# Log parameters and metrics
mlflow.log_params(param_dict)
mlflow.log_metrics(
{"test_mape": mape, "test_mae": mae, "test_rmse": rmse}
)

# Store results
result = {**param_dict, "mape": mape, "mae": mae, "rmse": rmse}
results.append(result)

print(f"Config {i+1}/{len(param_combinations)}: MAPE = {mape:.4f}")

except Exception as e:
print(f"Error in configuration {i+1}: {e}")
mlflow.log_param("error", str(e))

# Find best configuration
best_result = min(results, key=lambda x: x["mape"])

# Log best configuration
mlflow.log_params({f"best_{k}": v for k, v in best_result.items()})

# Train final model with best parameters
best_params = {
k: v for k, v in best_result.items() if k not in ["mape", "mae", "rmse"]
}
final_model = Prophet(**best_params)
final_model.fit(df)

# Log final model
mlflow.prophet.log_model(
pr_model=final_model,
name="best_model",
input_example=df[["ds"]].head(),
)

return final_model, best_result, results


# Usage
best_model, best_config, all_results = optimize_prophet_hyperparameters(df)

Advanced Optimization with Optuna​

Bayesian Hyperparameter Optimization
import optuna


def objective(trial, df):
"""Optuna objective function for Prophet optimization."""

# Define hyperparameter search space
params = {
"changepoint_prior_scale": trial.suggest_float(
"changepoint_prior_scale", 0.001, 1.0, log=True
),
"seasonality_prior_scale": trial.suggest_float(
"seasonality_prior_scale", 0.01, 50.0, log=True
),
"holidays_prior_scale": trial.suggest_float(
"holidays_prior_scale", 0.01, 50.0, log=True
),
"seasonality_mode": trial.suggest_categorical(
"seasonality_mode", ["additive", "multiplicative"]
),
"yearly_seasonality": trial.suggest_categorical(
"yearly_seasonality", [True, False, "auto"]
),
"weekly_seasonality": trial.suggest_categorical(
"weekly_seasonality", [True, False, "auto"]
),
"daily_seasonality": trial.suggest_categorical(
"daily_seasonality", [True, False, "auto"]
),
}

with mlflow.start_run(nested=True):
try:
# Create and train model
model = Prophet(**params)

# Time series cross-validation
cv_results = cross_validation(
model.fit(df),
initial="730 days",
period="180 days",
horizon="90 days",
parallel="threads",
)

# Calculate performance metric
metrics = performance_metrics(cv_results)
mape = metrics["mape"].mean()

# Log trial results
mlflow.log_params(params)
mlflow.log_metric("cv_mape", mape)

return mape

except Exception as e:
print(f"Trial failed: {e}")
return float("inf")


def optuna_prophet_optimization(df, n_trials=100):
"""Run Optuna optimization for Prophet."""

with mlflow.start_run(run_name="Optuna Prophet Optimization"):
# Create study
study = optuna.create_study(
direction="minimize", sampler=optuna.samplers.TPESampler(seed=42)
)

# Optimize
study.optimize(
lambda trial: objective(trial, df),
n_trials=n_trials,
show_progress_bar=True,
)

# Log best results
best_params = study.best_params
best_value = study.best_value

mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_mape", best_value)

# Train final model
final_model = Prophet(**best_params)
final_model.fit(df)

mlflow.prophet.log_model(pr_model=final_model, name="optuna_best_model")

return final_model, study


# Usage
# optimized_model, study = optuna_prophet_optimization(df, n_trials=50)

Model Deployment and Serving​

Model Loading and Prediction​

def load_and_predict_prophet_model(model_uri, future_periods=30):
"""Load Prophet model and generate predictions."""

# Load model
loaded_model = mlflow.prophet.load_model(model_uri)

# Generate future dataframe
future = loaded_model.make_future_dataframe(periods=future_periods)

# Add any required regressors or caps
if hasattr(loaded_model, "extra_regressors") and loaded_model.extra_regressors:
# You would need to provide values for external regressors
# This is a simplified example
for regressor in loaded_model.extra_regressors:
future[regressor] = np.random.normal(1000, 100, len(future))

if loaded_model.growth == "logistic":
future["cap"] = 10000 # Set appropriate capacity

# Generate predictions
forecast = loaded_model.predict(future)

return forecast


# Usage
# run_id = "your_run_id_here"
# model_uri = f"runs:/{run_id}/prophet_model"
# predictions = load_and_predict_prophet_model(model_uri, future_periods=365)

Production Deployment Patterns​

class ProphetForecaster:
"""Production-ready Prophet forecaster class."""

def __init__(self, model_uri):
self.model_uri = model_uri
self.model = None
self.last_training_date = None

def load_model(self):
"""Load the Prophet model."""
self.model = mlflow.prophet.load_model(self.model_uri)
if hasattr(self.model, "history"):
self.last_training_date = self.model.history["ds"].max()

def predict(self, periods=30, frequency="D", include_history=False):
"""Generate predictions."""
if self.model is None:
self.load_model()

# Generate future dataframe
future = self.model.make_future_dataframe(
periods=periods, freq=frequency, include_history=include_history
)

# Handle logistic growth
if self.model.growth == "logistic":
future["cap"] = future.get("cap", 10000) # Default capacity

# Generate forecast
forecast = self.model.predict(future)

# Return relevant columns
columns = ["ds", "yhat", "yhat_lower", "yhat_upper"]
if not include_history:
# Return only future predictions
forecast = forecast.tail(periods)

return forecast[columns]

def get_components(self, periods=30):
"""Get forecast components."""
if self.model is None:
self.load_model()

future = self.model.make_future_dataframe(periods=periods)
if self.model.growth == "logistic":
future["cap"] = future.get("cap", 10000)

forecast = self.model.predict(future)

# Extract components
components = {}
for component in ["trend", "yearly", "weekly"]:
if component in forecast.columns:
components[component] = forecast[["ds", component]].tail(periods)

return components

def check_model_freshness(self, current_date=None):
"""Check if model needs retraining."""
if current_date is None:
current_date = pd.Timestamp.now()

if self.last_training_date is None:
return False, "No training date available"

days_since_training = (current_date - self.last_training_date).days

# Define freshness threshold (e.g., 30 days)
freshness_threshold = 30

is_fresh = days_since_training < freshness_threshold
message = f"Model is {days_since_training} days old"

return is_fresh, message


# Usage
forecaster = ProphetForecaster("models:/ProphetForecastModel/Production")
predictions = forecaster.predict(periods=90)
components = forecaster.get_components(periods=90)
is_fresh, message = forecaster.check_model_freshness()
Batch Prediction Pipeline
def batch_prophet_predictions(model_registry_name, stage="Production"):
"""Run batch predictions for multiple time series."""

with mlflow.start_run(run_name="Batch Prophet Predictions"):
# Load production model
model_uri = f"models:/{model_registry_name}/{stage}"
model = mlflow.prophet.load_model(model_uri)

# Generate predictions for different horizons
horizons = [30, 90, 365] # Days
predictions = {}

for horizon in horizons:
future = model.make_future_dataframe(periods=horizon)
if model.growth == "logistic":
future["cap"] = 10000 # Set capacity

forecast = model.predict(future)

# Store predictions
predictions[f"{horizon}_days"] = forecast[
["ds", "yhat", "yhat_lower", "yhat_upper"]
].tail(horizon)

# Log prediction summary
pred_summary = {
f"{horizon}d_mean_forecast": forecast["yhat"].tail(horizon).mean(),
f"{horizon}d_forecast_range": forecast["yhat"].tail(horizon).max()
- forecast["yhat"].tail(horizon).min(),
}
mlflow.log_metrics(pred_summary)

# Save predictions as artifacts
for horizon, pred_df in predictions.items():
filename = f"predictions_{horizon}.csv"
pred_df.to_csv(filename, index=False)
mlflow.log_artifact(filename)

# Log batch prediction metadata
mlflow.log_params(
{
"model_uri": model_uri,
"prediction_date": pd.Timestamp.now().isoformat(),
"horizons": horizons,
}
)

return predictions

Model Monitoring and Maintenance​

Forecast Accuracy Monitoring​

def monitor_forecast_accuracy(model_uri, actuals_df, prediction_horizon_days=30):
"""Monitor Prophet model accuracy against actual values."""

with mlflow.start_run(run_name="Forecast Accuracy Monitoring"):
# Load model
model = mlflow.prophet.load_model(model_uri)

# Generate historical predictions for comparison
cutoff_date = actuals_df["ds"].max() - pd.Timedelta(
days=prediction_horizon_days
)
historical_data = actuals_df[actuals_df["ds"] <= cutoff_date]

# Refit model on historical data
temp_model = Prophet()
temp_model.fit(historical_data)

# Generate predictions for the monitoring period
future = temp_model.make_future_dataframe(periods=prediction_horizon_days)
if temp_model.growth == "logistic":
future["cap"] = (
historical_data["cap"].iloc[-1]
if "cap" in historical_data.columns
else 10000
)

forecast = temp_model.predict(future)

# Get actual values for the prediction period
actual_values = actuals_df[actuals_df["ds"] > cutoff_date]
forecast_values = forecast[forecast["ds"] > cutoff_date]

# Align dates
merged = actual_values.merge(
forecast_values[["ds", "yhat", "yhat_lower", "yhat_upper"]], on="ds"
)

if len(merged) > 0:
# Calculate accuracy metrics
mae = np.mean(np.abs(merged["y"] - merged["yhat"]))
mape = np.mean(np.abs((merged["y"] - merged["yhat"]) / merged["y"])) * 100
rmse = np.sqrt(np.mean((merged["y"] - merged["yhat"]) ** 2))

# Coverage (percentage of actuals within prediction intervals)
coverage = (
np.mean(
(merged["y"] >= merged["yhat_lower"])
& (merged["y"] <= merged["yhat_upper"])
)
* 100
)

# Log metrics
accuracy_metrics = {
"monitoring_mae": mae,
"monitoring_mape": mape,
"monitoring_rmse": rmse,
"prediction_coverage": coverage,
}
mlflow.log_metrics(accuracy_metrics)

# Create accuracy visualization
plt.figure(figsize=(12, 6))
plt.plot(merged["ds"], merged["y"], label="Actual", marker="o")
plt.plot(merged["ds"], merged["yhat"], label="Predicted", marker="s")
plt.fill_between(
merged["ds"],
merged["yhat_lower"],
merged["yhat_upper"],
alpha=0.3,
label="Prediction Interval",
)
plt.title(f"Forecast Accuracy Monitoring (MAPE: {mape:.2f}%)")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("accuracy_monitoring.png", dpi=300, bbox_inches="tight")
mlflow.log_artifact("accuracy_monitoring.png")
plt.close()

return accuracy_metrics
else:
print("No overlapping dates found for accuracy assessment")
return None


# Usage
# accuracy_metrics = monitor_forecast_accuracy(model_uri, new_actuals_df, prediction_horizon_days=30)

Automated Model Retraining​

Production Model Update Pipeline
def automated_prophet_retraining(
current_model_name, new_data, performance_threshold_mape=10.0, min_data_points=100
):
"""Automated Prophet model retraining pipeline."""

with mlflow.start_run(run_name="Automated Prophet Retraining"):
# Load current production model
current_model_uri = f"models:/{current_model_name}/Production"

try:
current_model = mlflow.prophet.load_model(current_model_uri)
mlflow.log_param("current_model_loaded", True)
except Exception as e:
print(f"Could not load current model: {e}")
current_model = None
mlflow.log_param("current_model_loaded", False)

# Data quality checks
data_quality_passed = True
quality_issues = []

# Check data quantity
if len(new_data) < min_data_points:
data_quality_passed = False
quality_issues.append(
f"Insufficient data: {len(new_data)} < {min_data_points}"
)

# Check for missing values
missing_values = new_data[["ds", "y"]].isnull().sum().sum()
if missing_values > 0:
quality_issues.append(f"Missing values found: {missing_values}")

# Check date continuity
new_data = new_data.sort_values("ds")
date_gaps = pd.to_datetime(new_data["ds"]).diff().dt.days
large_gaps = (date_gaps > 7).sum() # Gaps larger than 7 days
if large_gaps > 0:
quality_issues.append(f"Large date gaps found: {large_gaps}")

mlflow.log_params(
{
"data_quality_passed": data_quality_passed,
"data_points": len(new_data),
"quality_issues": "; ".join(quality_issues),
}
)

if not data_quality_passed:
print("Data quality checks failed. Skipping retraining.")
return None

# Train new model
new_model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
changepoint_prior_scale=0.05,
)

new_model.fit(new_data)

# Evaluate new model performance
cv_results = cross_validation(
new_model,
initial="365 days",
period="90 days",
horizon="30 days",
parallel="threads",
)

metrics = performance_metrics(cv_results)
new_mape = metrics["mape"].mean()

mlflow.log_metric("new_model_mape", new_mape)

# Compare with current model if available
should_deploy = True
if current_model is not None:
try:
# Test current model on new data
current_cv = cross_validation(
current_model,
initial="365 days",
period="90 days",
horizon="30 days",
)
current_metrics = performance_metrics(current_cv)
current_mape = current_metrics["mape"].mean()

mlflow.log_metric("current_model_mape", current_mape)

# Deploy if new model is significantly better
improvement = (current_mape - new_mape) / current_mape * 100
mlflow.log_metric("performance_improvement_percent", improvement)

should_deploy = improvement > 5.0 # Deploy if >5% improvement

except Exception as e:
print(f"Could not evaluate current model: {e}")
should_deploy = new_mape < performance_threshold_mape
else:
should_deploy = new_mape < performance_threshold_mape

mlflow.log_params(
{
"should_deploy": should_deploy,
"performance_threshold": performance_threshold_mape,
}
)

# Log and potentially deploy new model
model_info = mlflow.prophet.log_model(
pr_model=new_model,
name="retrained_model",
registered_model_name=current_model_name if should_deploy else None,
)

if should_deploy:
# Transition to production
client = mlflow.MlflowClient()
latest_version = client.get_latest_versions(
current_model_name, stages=["None"]
)[0]

client.transition_model_version_stage(
name=current_model_name,
version=latest_version.version,
stage="Production",
)

print(f"New model deployed to production with MAPE: {new_mape:.2f}%")
else:
print(
f"New model not deployed. MAPE: {new_mape:.2f}% did not meet criteria."
)

return new_model, should_deploy

Best Practices and Tips​

Data Preparation Best Practices​

def prophet_data_best_practices():
"""Demonstrate Prophet data preparation best practices."""

best_practices = {
"data_frequency": "Use consistent frequency (daily, weekly, monthly)",
"missing_values": "Prophet handles missing values, but document them",
"outliers": "Consider outlier detection and handling",
"data_volume": "Minimum 2-3 seasonal cycles (2-3 years for yearly seasonality)",
"column_names": "Always use 'ds' for dates and 'y' for values",
"date_format": "Ensure dates are properly parsed as datetime",
"timezone_handling": "Be consistent with timezone handling",
}

print("Prophet Data Preparation Best Practices:")
for practice, description in best_practices.items():
print(f"- {practice}: {description}")

return best_practices


# Data validation function
def validate_prophet_data(df):
"""Validate data for Prophet modeling."""

issues = []
recommendations = []

# Check required columns
if not all(col in df.columns for col in ["ds", "y"]):
issues.append("Missing required columns 'ds' and/or 'y'")

# Check data types
if "ds" in df.columns and not pd.api.types.is_datetime64_any_dtype(df["ds"]):
issues.append("Column 'ds' is not datetime type")
recommendations.append(
"Convert 'ds' to datetime: df['ds'] = pd.to_datetime(df['ds'])"
)

# Check for sufficient data
if len(df) < 100:
issues.append(f"Insufficient data points: {len(df)} (recommend >100)")

# Check for missing values
missing_y = df["y"].isnull().sum()
if missing_y > 0:
recommendations.append(f"Consider handling {missing_y} missing values in 'y'")

# Check for duplicate dates
if "ds" in df.columns:
duplicates = df["ds"].duplicated().sum()
if duplicates > 0:
issues.append(f"Found {duplicates} duplicate dates")

# Check data range
if "ds" in df.columns and len(df) > 0:
date_range = (df["ds"].max() - df["ds"].min()).days
if date_range < 365:
recommendations.append(
"Less than 1 year of data may limit seasonality detection"
)

return {
"issues": issues,
"recommendations": recommendations,
"data_points": len(df),
"date_range_days": date_range if "ds" in df.columns and len(df) > 0 else 0,
}


# Usage
validation_results = validate_prophet_data(df)
print("Validation Results:", validation_results)

Performance Optimization​

Prophet Performance Tips
def optimize_prophet_performance():
"""Tips for optimizing Prophet performance."""

optimization_tips = {
"parallel_processing": {
"cross_validation": "Use parallel='threads' or 'processes' in cross_validation()",
"multiple_models": "Use joblib.Parallel for training multiple models",
},
"model_configuration": {
"mcmc_samples": "Set mcmc_samples=0 for faster MAP estimation",
"uncertainty_samples": "Reduce uncertainty_samples for faster predictions",
"stan_backend": "Use 'CMDSTANPY' backend for better performance",
},
"data_preprocessing": {
"frequency": "Aggregate to appropriate frequency (daily vs hourly)",
"outliers": "Remove extreme outliers before training",
"data_size": "Consider sampling for very large datasets",
},
}

return optimization_tips


# Example of parallel cross-validation
def parallel_prophet_evaluation(models_dict, df):
"""Evaluate multiple Prophet models in parallel."""

from joblib import Parallel, delayed

def evaluate_single_model(name, model):
try:
cv_results = cross_validation(
model.fit(df),
initial="365 days",
period="90 days",
horizon="30 days",
parallel="threads",
)
metrics = performance_metrics(cv_results)
return name, metrics["mape"].mean()
except Exception as e:
return name, float("inf")

# Parallel evaluation
results = Parallel(n_jobs=-1)(
delayed(evaluate_single_model)(name, model)
for name, model in models_dict.items()
)

return dict(results)

Conclusion​

MLflow's Prophet integration provides a comprehensive solution for time series forecasting, experiment tracking, and model deployment. Whether you're forecasting business metrics, planning resources, or predicting future trends, the combination of Prophet's intuitive forecasting capabilities and MLflow's robust experiment management creates a powerful platform for professional time series analysis.

Key benefits of using MLflow with Prophet include:

  • Simplified Forecasting Workflow: Easy model logging and experiment tracking for time series projects
  • Comprehensive Validation: Built-in cross-validation and performance assessment tools
  • Business-Ready Features: Holiday modeling, custom seasonalities, and interpretable components
  • Production Deployment: Model registry integration with automated retraining capabilities
  • Collaborative Development: Team-friendly experiment sharing and model governance

The patterns and examples in this guide provide a solid foundation for building scalable, reliable time series forecasting systems. Start with basic Prophet models for immediate insights, then gradually adopt advanced features like custom seasonalities, automated hyperparameter tuning, and production monitoring as your forecasting needs evolve.

Prophet's philosophy of making forecasting accessible to business users, combined with MLflow's enterprise-grade experiment management, creates an ideal platform for data-driven decision making through accurate, interpretable time series predictions.