import importlib
import importlib.metadata
import os
import shlex
import sys
import textwrap
import types
from flask import Flask, Response, send_from_directory
from packaging.version import Version
from mlflow.exceptions import MlflowException
from mlflow.server import handlers
from mlflow.server.handlers import (
STATIC_PREFIX_ENV_VAR,
_add_static_prefix,
create_promptlab_run_handler,
gateway_proxy_handler,
get_artifact_handler,
get_metric_history_bulk_handler,
get_metric_history_bulk_interval_handler,
get_model_version_artifact_handler,
get_trace_artifact_handler,
search_datasets_handler,
upload_artifact_handler,
)
from mlflow.utils.os import is_windows
from mlflow.utils.plugins import get_entry_points
from mlflow.utils.process import _exec_cmd
from mlflow.version import VERSION
# NB: These are internal environment variables used for communication between
# the cli and the forked gunicorn processes.
BACKEND_STORE_URI_ENV_VAR = "_MLFLOW_SERVER_FILE_STORE"
REGISTRY_STORE_URI_ENV_VAR = "_MLFLOW_SERVER_REGISTRY_STORE"
ARTIFACT_ROOT_ENV_VAR = "_MLFLOW_SERVER_ARTIFACT_ROOT"
ARTIFACTS_DESTINATION_ENV_VAR = "_MLFLOW_SERVER_ARTIFACT_DESTINATION"
PROMETHEUS_EXPORTER_ENV_VAR = "prometheus_multiproc_dir"
SERVE_ARTIFACTS_ENV_VAR = "_MLFLOW_SERVER_SERVE_ARTIFACTS"
ARTIFACTS_ONLY_ENV_VAR = "_MLFLOW_SERVER_ARTIFACTS_ONLY"
REL_STATIC_DIR = "js/build"
app = Flask(__name__, static_folder=REL_STATIC_DIR)
IS_FLASK_V1 = Version(importlib.metadata.version("flask")) < Version("2.0")
for http_path, handler, methods in handlers.get_endpoints():
app.add_url_rule(http_path, handler.__name__, handler, methods=methods)
if os.getenv(PROMETHEUS_EXPORTER_ENV_VAR):
from mlflow.server.prometheus_exporter import activate_prometheus_exporter
prometheus_metrics_path = os.getenv(PROMETHEUS_EXPORTER_ENV_VAR)
if not os.path.exists(prometheus_metrics_path):
os.makedirs(prometheus_metrics_path)
activate_prometheus_exporter(app)
# Provide a health check endpoint to ensure the application is responsive
@app.route("/health")
def health():
return "OK", 200
# Provide an endpoint to query the version of mlflow running on the server
@app.route("/version")
def version():
return VERSION, 200
# Serve the "get-artifact" route.
@app.route(_add_static_prefix("/get-artifact"))
def serve_artifacts():
return get_artifact_handler()
# Serve the "model-versions/get-artifact" route.
@app.route(_add_static_prefix("/model-versions/get-artifact"))
def serve_model_version_artifact():
return get_model_version_artifact_handler()
# Serve the "metrics/get-history-bulk" route.
@app.route(_add_static_prefix("/ajax-api/2.0/mlflow/metrics/get-history-bulk"))
def serve_get_metric_history_bulk():
return get_metric_history_bulk_handler()
# Serve the "metrics/get-history-bulk-interval" route.
@app.route(_add_static_prefix("/ajax-api/2.0/mlflow/metrics/get-history-bulk-interval"))
def serve_get_metric_history_bulk_interval():
return get_metric_history_bulk_interval_handler()
# Serve the "experiments/search-datasets" route.
@app.route(_add_static_prefix("/ajax-api/2.0/mlflow/experiments/search-datasets"), methods=["POST"])
def serve_search_datasets():
return search_datasets_handler()
# Serve the "runs/create-promptlab-run" route.
@app.route(_add_static_prefix("/ajax-api/2.0/mlflow/runs/create-promptlab-run"), methods=["POST"])
def serve_create_promptlab_run():
return create_promptlab_run_handler()
@app.route(_add_static_prefix("/ajax-api/2.0/mlflow/gateway-proxy"), methods=["POST", "GET"])
def serve_gateway_proxy():
return gateway_proxy_handler()
@app.route(_add_static_prefix("/ajax-api/2.0/mlflow/upload-artifact"), methods=["POST"])
def serve_upload_artifact():
return upload_artifact_handler()
# Serve the "/get-trace-artifact" route to allow frontend to fetch trace artifacts
# and render them in the Trace UI. The request body should contain the request_id
# of the trace.
@app.route(_add_static_prefix("/ajax-api/2.0/mlflow/get-trace-artifact"), methods=["GET"])
def serve_get_trace_artifact():
return get_trace_artifact_handler()
# We expect the react app to be built assuming it is hosted at /static-files, so that requests for
# CSS/JS resources will be made to e.g. /static-files/main.css and we can handle them here.
# The files are hashed based on source code, so ok to send Cache-Control headers via max_age.
@app.route(_add_static_prefix("/static-files/<path:path>"))
def serve_static_file(path):
if IS_FLASK_V1:
return send_from_directory(app.static_folder, path, cache_timeout=2419200)
else:
return send_from_directory(app.static_folder, path, max_age=2419200)
# Serve the index.html for the React App for all other routes.
@app.route(_add_static_prefix("/"))
def serve():
if os.path.exists(os.path.join(app.static_folder, "index.html")):
return send_from_directory(app.static_folder, "index.html")
text = textwrap.dedent(
"""
Unable to display MLflow UI - landing page (index.html) not found.
You are very likely running the MLflow server using a source installation of the Python MLflow
package.
If you are a developer making MLflow source code changes and intentionally running a source
installation of MLflow, you can view the UI by running the Javascript dev server:
https://github.com/mlflow/mlflow/blob/master/CONTRIBUTING.md#running-the-javascript-dev-server
Otherwise, uninstall MLflow via 'pip uninstall mlflow', reinstall an official MLflow release
from PyPI via 'pip install mlflow', and rerun the MLflow server.
"""
)
return Response(text, mimetype="text/plain")
def _find_app(app_name: str) -> str:
apps = get_entry_points("mlflow.app")
for app in apps:
if app.name == app_name:
return app.value
raise MlflowException(
f"Failed to find app '{app_name}'. Available apps: {[a.name for a in apps]}"
)
def _is_factory(app: str) -> bool:
"""
Returns True if the given app is a factory function, False otherwise.
Args:
app: The app to check, e.g. "mlflow.server.app:app
"""
module, obj_name = app.rsplit(":", 1)
mod = importlib.import_module(module)
obj = getattr(mod, obj_name)
return isinstance(obj, types.FunctionType)
[docs]def get_app_client(app_name: str, *args, **kwargs):
"""
Instantiate a client provided by an app.
Args:
app_name: The app name defined in `setup.py`, e.g., "basic-auth".
args: Additional arguments passed to the app client constructor.
kwargs: Additional keyword arguments passed to the app client constructor.
Returns:
An app client instance.
"""
clients = get_entry_points("mlflow.app.client")
for client in clients:
if client.name == app_name:
cls = client.load()
return cls(*args, **kwargs)
raise MlflowException(
f"Failed to find client for '{app_name}'. Available clients: {[c.name for c in clients]}"
)
def _build_waitress_command(waitress_opts, host, port, app_name, is_factory):
opts = shlex.split(waitress_opts) if waitress_opts else []
return [
sys.executable,
"-m",
"waitress",
*opts,
f"--host={host}",
f"--port={port}",
"--ident=mlflow",
*(["--call"] if is_factory else []),
app_name,
]
def _build_gunicorn_command(gunicorn_opts, host, port, workers, app_name):
bind_address = f"{host}:{port}"
opts = shlex.split(gunicorn_opts) if gunicorn_opts else []
return [
sys.executable,
"-m",
"gunicorn",
*opts,
"-b",
bind_address,
"-w",
str(workers),
app_name,
]
def _run_server(
file_store_path,
registry_store_uri,
default_artifact_root,
serve_artifacts,
artifacts_only,
artifacts_destination,
host,
port,
static_prefix=None,
workers=None,
gunicorn_opts=None,
waitress_opts=None,
expose_prometheus=None,
app_name=None,
):
"""
Run the MLflow server, wrapping it in gunicorn or waitress on windows
Args:
static_prefix: If set, the index.html asset will be served from the path static_prefix.
If left None, the index.html asset will be served from the root path.
Returns:
None
"""
env_map = {}
if file_store_path:
env_map[BACKEND_STORE_URI_ENV_VAR] = file_store_path
if registry_store_uri:
env_map[REGISTRY_STORE_URI_ENV_VAR] = registry_store_uri
if default_artifact_root:
env_map[ARTIFACT_ROOT_ENV_VAR] = default_artifact_root
if serve_artifacts:
env_map[SERVE_ARTIFACTS_ENV_VAR] = "true"
if artifacts_only:
env_map[ARTIFACTS_ONLY_ENV_VAR] = "true"
if artifacts_destination:
env_map[ARTIFACTS_DESTINATION_ENV_VAR] = artifacts_destination
if static_prefix:
env_map[STATIC_PREFIX_ENV_VAR] = static_prefix
if expose_prometheus:
env_map[PROMETHEUS_EXPORTER_ENV_VAR] = expose_prometheus
if app_name is None:
app = f"{__name__}:app"
is_factory = False
else:
app = _find_app(app_name)
is_factory = _is_factory(app)
# `waitress` doesn't support `()` syntax for factory functions.
# Instead, we need to use the `--call` flag.
app = f"{app}()" if (not is_windows() and is_factory) else app
# TODO: eventually may want waitress on non-win32
if sys.platform == "win32":
full_command = _build_waitress_command(waitress_opts, host, port, app, is_factory)
else:
full_command = _build_gunicorn_command(gunicorn_opts, host, port, workers or 4, app)
_exec_cmd(full_command, extra_env=env_map, capture_output=False)