Dataset Evaluation
Dataset evaluation allows you to assess model performance on pre-computed predictions without re-running the model. This is particularly useful for evaluating large-scale batch inference results, historical predictions, or when you want to separate the prediction and evaluation phases.
Quick Start: Evaluating Static Predictionsβ
The simplest dataset evaluation involves a DataFrame with predictions and targets:
import mlflow
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
# Generate sample data and train a model
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Generate predictions (this could be from a batch job, stored results, etc.)
predictions = model.predict(X_test)
prediction_probabilities = model.predict_proba(X_test)[:, 1]
# Create evaluation dataset with predictions already computed
eval_dataset = pd.DataFrame(
{
"prediction": predictions,
"prediction_proba": prediction_probabilities,
"target": y_test,
}
)
# Add original features for analysis (optional)
feature_names = [f"feature_{i}" for i in range(X_test.shape[1])]
for i, feature_name in enumerate(feature_names):
eval_dataset[feature_name] = X_test[:, i]
with mlflow.start_run():
# Evaluate static dataset - no model needed!
result = mlflow.evaluate(
data=eval_dataset,
predictions="prediction", # Column containing predictions
targets="target", # Column containing true labels
model_type="classifier",
)
print(f"Accuracy: {result.metrics['accuracy_score']:.3f}")
print(f"F1 Score: {result.metrics['f1_score']:.3f}")
print(f"ROC AUC: {result.metrics['roc_auc']:.3f}")
This approach is perfect when:
- You have batch prediction results from a production system
- You want to evaluate historical predictions
- You're comparing different versions of the same model's outputs
- You need to separate compute-intensive prediction from evaluation
Dataset Managementβ
- MLflow PandasDataset
- Multi-Output Models
For more structured dataset management, use MLflow's PandasDataset:
import mlflow.data
# Create MLflow dataset with prediction column specified
dataset = mlflow.data.from_pandas(
eval_dataset,
predictions="prediction", # Specify prediction column
targets="target", # Specify target column
)
with mlflow.start_run():
# Log the dataset
mlflow.log_input(dataset, context="evaluation")
# Evaluate using the dataset (predictions=None since specified in dataset)
result = mlflow.evaluate(
data=dataset,
predictions=None, # Already specified in dataset creation
targets="target",
model_type="classifier",
)
print("Evaluation completed using MLflow PandasDataset")
When your model produces multiple outputs, you can evaluate different output columns:
# Simulate multi-output model results
multi_output_data = pd.DataFrame(
{
"primary_prediction": predictions,
"confidence_score": prediction_probabilities,
"auxiliary_output": np.random.random(
len(predictions)
), # Additional model output
"target": y_test,
}
)
with mlflow.start_run():
# Evaluate primary prediction
result = mlflow.evaluate(
data=multi_output_data,
predictions="primary_prediction",
targets="target",
model_type="classifier",
)
# Access other outputs for custom analysis
confidence_scores = multi_output_data["confidence_score"]
auxiliary_outputs = multi_output_data["auxiliary_output"]
# Log additional analysis
mlflow.log_metrics(
{
"avg_confidence": confidence_scores.mean(),
"confidence_std": confidence_scores.std(),
"avg_auxiliary": auxiliary_outputs.mean(),
}
)
Batch Evaluation Workflowsβ
- Large-Scale Batch Results
- Historical Performance Analysis
- Comparative Dataset Evaluation
For production batch inference results:
def evaluate_batch_predictions(batch_results_path, batch_size=10000):
"""Evaluate large batch prediction results efficiently."""
# Read batch results (could be from S3, database, etc.)
batch_df = pd.read_parquet(batch_results_path)
print(f"Evaluating {len(batch_df)} batch predictions")
with mlflow.start_run(run_name="Batch_Evaluation"):
# Log batch metadata
mlflow.log_params(
{
"batch_size": len(batch_df),
"batch_date": batch_df.get("prediction_date", "unknown").iloc[0]
if len(batch_df) > 0
else "unknown",
"data_source": batch_results_path,
}
)
# Evaluate full batch
result = mlflow.evaluate(
data=batch_df,
predictions="model_prediction",
targets="true_label",
model_type="classifier",
)
# Additional batch-specific analysis
if "prediction_timestamp" in batch_df.columns:
# Analyze performance over time
batch_df["hour"] = pd.to_datetime(batch_df["prediction_timestamp"]).dt.hour
hourly_accuracy = batch_df.groupby("hour").apply(
lambda x: (x["model_prediction"] == x["true_label"]).mean()
)
# Log time-based metrics
for hour, accuracy in hourly_accuracy.items():
mlflow.log_metric(f"accuracy_hour_{hour}", accuracy)
return result
# Usage
# result = evaluate_batch_predictions("s3://my-bucket/batch-predictions/2024-01-15.parquet")
Analyze model performance trends over time:
def analyze_historical_performance(historical_data, time_column="prediction_date"):
"""Analyze model performance trends over time."""
historical_data[time_column] = pd.to_datetime(historical_data[time_column])
with mlflow.start_run(run_name="Historical_Performance_Analysis"):
# Overall historical evaluation
overall_result = mlflow.evaluate(
data=historical_data,
predictions="prediction",
targets="actual",
model_type="classifier",
)
# Time-based analysis
historical_data["month"] = historical_data[time_column].dt.to_period("M")
monthly_performance = []
for month in historical_data["month"].unique():
month_data = historical_data[historical_data["month"] == month]
if len(month_data) > 50: # Minimum samples for reliable metrics
with mlflow.start_run(run_name=f"Month_{month}", nested=True):
month_result = mlflow.evaluate(
data=month_data,
predictions="prediction",
targets="actual",
model_type="classifier",
)
monthly_performance.append(
{
"month": str(month),
"accuracy": month_result.metrics["accuracy_score"],
"f1": month_result.metrics["f1_score"],
"sample_count": len(month_data),
}
)
# Log trend analysis
if monthly_performance:
perf_df = pd.DataFrame(monthly_performance)
# Calculate trends
accuracy_trend = np.polyfit(range(len(perf_df)), perf_df["accuracy"], 1)[0]
f1_trend = np.polyfit(range(len(perf_df)), perf_df["f1"], 1)[0]
mlflow.log_metrics(
{
"accuracy_trend_slope": accuracy_trend,
"f1_trend_slope": f1_trend,
"performance_variance": perf_df["accuracy"].var(),
"months_analyzed": len(monthly_performance),
}
)
# Save trend data
perf_df.to_csv("monthly_performance.csv", index=False)
mlflow.log_artifact("monthly_performance.csv")
return overall_result, monthly_performance
# Usage example
# historical_result, trends = analyze_historical_performance(historical_predictions_df)
Compare model performance across different datasets or data slices:
def compare_datasets(datasets_dict, model_type="classifier"):
"""Compare model performance across multiple datasets."""
comparison_results = {}
with mlflow.start_run(run_name="Dataset_Comparison"):
for dataset_name, dataset in datasets_dict.items():
with mlflow.start_run(run_name=f"Dataset_{dataset_name}", nested=True):
result = mlflow.evaluate(
data=dataset,
predictions="prediction",
targets="target",
model_type=model_type,
)
comparison_results[dataset_name] = result.metrics
# Log dataset characteristics
mlflow.log_params(
{
"dataset_name": dataset_name,
"dataset_size": len(dataset),
"positive_rate": dataset["target"].mean()
if model_type == "classifier"
else None,
}
)
# Log comparison metrics
if comparison_results:
accuracy_values = [
r.get("accuracy_score", 0) for r in comparison_results.values()
]
mlflow.log_metrics(
{
"max_accuracy": max(accuracy_values),
"min_accuracy": min(accuracy_values),
"accuracy_range": max(accuracy_values) - min(accuracy_values),
"datasets_compared": len(comparison_results),
}
)
return comparison_results
# Usage
datasets = {
"validation_set": validation_predictions_df,
"test_set": test_predictions_df,
"holdout_set": holdout_predictions_df,
}
comparison = compare_datasets(datasets)
Working with Large Datasetsβ
- Chunked Processing
- Sampling-Based Evaluation
- Memory Optimization
For datasets too large to fit in memory:
def evaluate_large_dataset_in_chunks(data_path, chunk_size=50000):
"""Evaluate very large datasets by processing in chunks."""
# Read data in chunks
chunk_results = []
total_samples = 0
with mlflow.start_run(run_name="Large_Dataset_Evaluation"):
for chunk_idx, chunk in enumerate(
pd.read_parquet(data_path, chunksize=chunk_size)
):
chunk_size_actual = len(chunk)
total_samples += chunk_size_actual
# Evaluate chunk
with mlflow.start_run(run_name=f"Chunk_{chunk_idx}", nested=True):
chunk_result = mlflow.evaluate(
data=chunk,
predictions="prediction",
targets="target",
model_type="classifier",
)
# Weight metrics by chunk size for aggregation
weighted_metrics = {
f"{k}_weighted": v * chunk_size_actual
for k, v in chunk_result.metrics.items()
if isinstance(v, (int, float))
}
chunk_results.append(
{
"chunk_idx": chunk_idx,
"chunk_size": chunk_size_actual,
"metrics": chunk_result.metrics,
"weighted_metrics": weighted_metrics,
}
)
mlflow.log_param("chunk_size", chunk_size_actual)
# Aggregate results across chunks
if chunk_results:
# Calculate weighted averages
total_weighted = {}
for chunk in chunk_results:
for metric, value in chunk["weighted_metrics"].items():
total_weighted[metric] = total_weighted.get(metric, 0) + value
# Log aggregated metrics
aggregated_metrics = {
k.replace("_weighted", "_aggregate"): v / total_samples
for k, v in total_weighted.items()
}
mlflow.log_metrics(aggregated_metrics)
mlflow.log_params(
{
"total_samples": total_samples,
"chunks_processed": len(chunk_results),
"avg_chunk_size": total_samples / len(chunk_results),
}
)
return chunk_results
# Usage
# results = evaluate_large_dataset_in_chunks("large_predictions.parquet")
For extremely large datasets, use statistical sampling:
def evaluate_with_sampling(large_dataset, sample_size=10000, n_samples=5):
"""Evaluate large dataset using multiple random samples."""
sample_results = []
with mlflow.start_run(run_name="Sampling_Based_Evaluation"):
for sample_idx in range(n_samples):
# Create random sample
if len(large_dataset) > sample_size:
sample_data = large_dataset.sample(
n=sample_size, random_state=sample_idx
)
else:
sample_data = large_dataset.copy()
with mlflow.start_run(run_name=f"Sample_{sample_idx}", nested=True):
sample_result = mlflow.evaluate(
data=sample_data,
predictions="prediction",
targets="target",
model_type="classifier",
)
sample_results.append(sample_result.metrics)
mlflow.log_params(
{
"sample_idx": sample_idx,
"sample_size": len(sample_data),
"random_seed": sample_idx,
}
)
# Aggregate sample results
if sample_results:
# Calculate statistics across samples
metrics_df = pd.DataFrame(sample_results)
aggregated_stats = {}
for metric in metrics_df.columns:
if metrics_df[metric].dtype in ["float64", "int64"]:
aggregated_stats.update(
{
f"{metric}_mean": metrics_df[metric].mean(),
f"{metric}_std": metrics_df[metric].std(),
f"{metric}_min": metrics_df[metric].min(),
f"{metric}_max": metrics_df[metric].max(),
}
)
mlflow.log_metrics(aggregated_stats)
mlflow.log_params(
{
"sampling_strategy": "random",
"samples_taken": n_samples,
"target_sample_size": sample_size,
}
)
# Save detailed results
metrics_df.to_csv("sample_results.csv", index=False)
mlflow.log_artifact("sample_results.csv")
return sample_results, aggregated_stats
# Usage
# samples, stats = evaluate_with_sampling(very_large_dataset, sample_size=5000, n_samples=10)
Best practices for memory-efficient evaluation:
def memory_efficient_evaluation(large_dataset_path, chunk_size=10000):
"""Memory-efficient evaluation of large datasets."""
chunk_metrics = []
with mlflow.start_run(run_name="Memory_Efficient_Evaluation"):
for chunk in pd.read_parquet(large_dataset_path, chunksize=chunk_size):
# Process chunk
chunk_result = mlflow.evaluate(
data=chunk,
predictions="prediction",
targets="target",
model_type="classifier",
)
# Store only essential metrics
chunk_metrics.append(
{
"size": len(chunk),
"accuracy": chunk_result.metrics["accuracy_score"],
"f1_score": chunk_result.metrics["f1_score"],
}
)
# Clear chunk from memory
del chunk
del chunk_result
# Compute weighted averages
total_samples = sum(cm["size"] for cm in chunk_metrics)
weighted_accuracy = (
sum(cm["accuracy"] * cm["size"] for cm in chunk_metrics) / total_samples
)
weighted_f1 = (
sum(cm["f1_score"] * cm["size"] for cm in chunk_metrics) / total_samples
)
# Log final metrics
mlflow.log_metrics(
{
"final_weighted_accuracy": weighted_accuracy,
"final_weighted_f1": weighted_f1,
"total_samples_processed": total_samples,
"chunks_processed": len(chunk_metrics),
}
)
return weighted_accuracy, chunk_metrics
# Usage
# accuracy, metrics = memory_efficient_evaluation("very_large_predictions.parquet")
Memory Management Tips:
- Chunked Processing: Use chunked processing for datasets larger than 1GB
- Data Types: Use appropriate data types (int32 vs int64) to reduce memory usage
- Garbage Collection: Explicitly delete large variables when done processing
- Streaming: Process data in streaming fashion when possible
Key Use Cases and Benefitsβ
Dataset evaluation in MLflow is particularly valuable for several scenarios:
Batch Processing - Perfect for evaluating large-scale batch prediction results from production systems without re-running expensive inference.
Historical Analysis - Ideal for analyzing model performance trends over time using previously computed predictions and ground truth data.
Model Comparison - Excellent for comparing different model versions' outputs on the same dataset without re-training or re-inference.
Production Monitoring - Essential for automated evaluation pipelines that assess model performance on incoming batch predictions.
Cost Optimization - Reduces computational costs by separating prediction generation from performance assessment, allowing evaluation without model re-execution.
Best Practicesβ
When using dataset evaluation, consider these best practices:
- Data Validation: Always validate that prediction and target columns contain expected data types and ranges
- Missing Values: Handle missing predictions or targets appropriately before evaluation
- Memory Management: Use chunked processing or sampling for very large datasets
- Metadata Logging: Log dataset characteristics, processing parameters, and evaluation context
- Storage Formats: Use efficient formats like Parquet for large prediction datasets
Conclusionβ
Dataset evaluation in MLflow provides powerful capabilities for assessing model performance on pre-computed predictions. This approach is essential for production ML systems where you need to separate prediction generation from performance assessment.
Key advantages of dataset evaluation include:
- Flexibility: Evaluate predictions from any source without re-running models
- Efficiency: Skip expensive model inference when predictions are already available
- Scale: Handle large-scale batch predictions and historical analysis
- Integration: Seamlessly work with production prediction pipelines
Whether you're analyzing batch predictions, conducting historical performance studies, or implementing automated evaluation pipelines, MLflow's dataset evaluation capabilities provide the tools needed for comprehensive model assessment at scale.