Source code for mlflow.diviner

"""
The ``mlflow.diviner`` module provides an API for logging, saving and loading ``diviner`` models.
Diviner wraps several popular open source time series forecasting libraries in a unified API that
permits training, back-testing cross validation, and forecasting inference for groups of related
series.
This module exports groups of univariate ``diviner`` models in the following formats:

Diviner format
    Serialized instance of a ``diviner`` model type using native diviner serializers.
    (e.g., "GroupedProphet" or "GroupedPmdarima")
:py:mod:`mlflow.pyfunc`
    Produced for use by generic pyfunc-based deployment tools and for batch auditing
    of historical forecasts.

.. _Diviner:
    https://databricks-diviner.readthedocs.io/en/latest/index.html
"""
import logging
import os
import pathlib
import shutil
from typing import Any, Dict, List, Optional, Tuple

import pandas as pd
import yaml

import mlflow
from mlflow import pyfunc
from mlflow.environment_variables import MLFLOW_DFS_TMP
from mlflow.exceptions import MlflowException
from mlflow.models import Model, ModelInputExample, ModelSignature
from mlflow.models.model import MLMODEL_FILE_NAME
from mlflow.models.signature import _infer_signature_from_input_example
from mlflow.models.utils import _save_example
from mlflow.protos.databricks_pb2 import INVALID_PARAMETER_VALUE
from mlflow.tracking._model_registry import DEFAULT_AWAIT_MAX_SLEEP_SECONDS
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils.docstring_utils import LOG_MODEL_PARAM_DOCS, format_docstring
from mlflow.utils.environment import (
    _CONDA_ENV_FILE_NAME,
    _CONSTRAINTS_FILE_NAME,
    _PYTHON_ENV_FILE_NAME,
    _REQUIREMENTS_FILE_NAME,
    _mlflow_conda_env,
    _process_conda_env,
    _process_pip_requirements,
    _PythonEnv,
    _validate_env_arguments,
)
from mlflow.utils.file_utils import (
    get_total_file_size,
    shutil_copytree_without_file_permissions,
    write_to,
)
from mlflow.utils.model_utils import (
    _add_code_from_conf_to_system_path,
    _get_flavor_configuration,
    _get_flavor_configuration_from_uri,
    _validate_and_copy_code_paths,
    _validate_and_prepare_target_save_path,
)
from mlflow.utils.requirements_utils import _get_pinned_requirement
from mlflow.utils.uri import dbfs_hdfs_uri_to_fuse_path, generate_tmp_dfs_path

FLAVOR_NAME = "diviner"
_MODEL_BINARY_KEY = "data"
_MODEL_BINARY_FILE_NAME = "model.div"
_MODEL_TYPE_KEY = "model_type"
_FLAVOR_KEY = "flavors"
_SPARK_MODEL_INDICATOR = "fit_with_spark"

_logger = logging.getLogger(__name__)


[docs]def get_default_pip_requirements(): """ Returns: A list of default pip requirements for MLflow Models produced with the ``Diviner`` flavor. Calls to :py:func:`save_model()` and :py:func:`log_model()` produce a pip environment that, at a minimum, contains these requirements. """ return [_get_pinned_requirement("diviner")]
[docs]def get_default_conda_env(): """ Returns: The default Conda environment for MLflow Models produced with the ``Diviner`` flavor that is produced by calls to :py:func:`save_model()` and :py:func:`log_model()`. """ return _mlflow_conda_env(additional_pip_deps=get_default_pip_requirements())
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def save_model( diviner_model, path, conda_env=None, code_paths=None, mlflow_model=None, signature: ModelSignature = None, input_example: ModelInputExample = None, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """Save a ``Diviner`` model object to a path on the local file system. Args: diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal ``DataFrame``. path: Local path destination for the serialized model is to be saved. conda_env: {{ conda_env }} code_paths: {{ code_paths }} mlflow_model: :py:mod:`mlflow.models.Model` the flavor that this model is being added to. signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python from mlflow.models import infer_signature model = diviner.GroupedProphet().fit(data, ("region", "state")) predictions = model.predict(prediction_config) signature = infer_signature(data, predictions) input_example: {{ input_example }} pip_requirements: {{ pip_requirements }} extra_pip_requirements: {{ extra_pip_requirements }} metadata: {{ metadata }} kwargs: Optional configurations for Spark DataFrame storage iff the model has been fit in Spark. Current supported options: - `partition_by` for setting a (or several) partition columns as a list of \ column names. Must be a list of strings of grouping key column(s). - `partition_count` for setting the number of part files to write from a \ repartition per `partition_by` group. The default part file count is 200. - `dfs_tmpdir` for specifying the DFS temporary location where the model will \ be stored while copying from a local file system to a Spark-supported "dbfs:/" \ scheme. """ import diviner _validate_env_arguments(conda_env, pip_requirements, extra_pip_requirements) path = pathlib.Path(path).absolute() _validate_and_prepare_target_save_path(str(path)) # NB: When moving to native pathlib implementations, path encoding as string will not be needed. code_dir_subpath = _validate_and_copy_code_paths(code_paths, str(path)) if mlflow_model is None: mlflow_model = Model() saved_example = _save_example(mlflow_model, input_example, str(path)) if signature is None and saved_example is not None: wrapped_model = _DivinerModelWrapper(diviner_model) signature = _infer_signature_from_input_example(saved_example, wrapped_model) if signature is not None: mlflow_model.signature = signature if metadata is not None: mlflow_model.metadata = metadata fit_with_spark = _save_diviner_model(diviner_model, path, **kwargs) flavor_conf = {_SPARK_MODEL_INDICATOR: fit_with_spark} model_bin_kwargs = {_MODEL_BINARY_KEY: _MODEL_BINARY_FILE_NAME} pyfunc.add_to_model( mlflow_model, loader_module="mlflow.diviner", conda_env=_CONDA_ENV_FILE_NAME, python_env=_PYTHON_ENV_FILE_NAME, code=code_dir_subpath, **model_bin_kwargs, ) flavor_conf.update({_MODEL_TYPE_KEY: diviner_model.__class__.__name__}, **model_bin_kwargs) mlflow_model.add_flavor( FLAVOR_NAME, diviner_version=diviner.__version__, code=code_dir_subpath, **flavor_conf ) if size := get_total_file_size(path): mlflow_model.model_size_bytes = size mlflow_model.save(str(path.joinpath(MLMODEL_FILE_NAME))) if conda_env is None: if pip_requirements is None: default_reqs = get_default_pip_requirements() inferred_reqs = mlflow.models.infer_pip_requirements( str(path), FLAVOR_NAME, fallback=default_reqs ) default_reqs = sorted(set(inferred_reqs).union(default_reqs)) else: default_reqs = None conda_env, pip_requirements, pip_constraints = _process_pip_requirements( default_reqs, pip_requirements, extra_pip_requirements ) else: conda_env, pip_requirements, pip_constraints = _process_conda_env(conda_env) with path.joinpath(_CONDA_ENV_FILE_NAME).open("w") as f: yaml.safe_dump(conda_env, stream=f, default_flow_style=False) if pip_constraints: write_to(str(path.joinpath(_CONSTRAINTS_FILE_NAME)), "\n".join(pip_constraints)) write_to(str(path.joinpath(_REQUIREMENTS_FILE_NAME)), "\n".join(pip_requirements)) _PythonEnv.current().to_yaml(str(path.joinpath(_PYTHON_ENV_FILE_NAME)))
def _save_diviner_model(diviner_model, path, **kwargs) -> bool: """ Saves a Diviner model to the specified path. If the model was fit by using a Pandas DataFrame for the training data submitted to `fit`, directly save the Diviner model object. If the Diviner model was fit by using a Spark DataFrame, save the model components separately. The metadata and ancillary files to write (JSON and Pandas DataFrames) are written directly to a fuse mount location, which the Spark DataFrame that contains the individual serialized Diviner model objects is written by using the 'dbfs:' scheme path that Spark recognizes. """ save_path = str(path.joinpath(_MODEL_BINARY_FILE_NAME)) if getattr(diviner_model, "_fit_with_spark", False): # Validate that the path is a relative path early in order to fail fast prior to attempting # to write the (large) DataFrame to a tmp DFS path first and raise a path validation # Exception within MLflow when attempting to copy the temporary write files from DFS to # the file system path provided. if not os.path.isabs(path): raise MlflowException( "The save path provided must be a relative path. " f"The path submitted, '{path}' is an absolute path." ) # Create a temporary DFS location to write the Spark DataFrame containing the models to. tmp_path = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get())) # Save the model Spark DataFrame to the temporary DFS location diviner_model._save_model_df_to_path(tmp_path, **kwargs) diviner_data_path = os.path.abspath(save_path) tmp_fuse_path = dbfs_hdfs_uri_to_fuse_path(tmp_path) shutil.move(src=tmp_fuse_path, dst=diviner_data_path) # Save the model metadata to the path location diviner_model._save_model_metadata_components_to_path(path=diviner_data_path) return True diviner_model.save(save_path) return False def _load_model_fit_in_spark(local_model_path: str, flavor_conf, **kwargs): """ Loads a Diviner model that has been fit (and saved) in the Spark variant. """ # NB: To load the model DataFrame (which is a Spark DataFrame), Spark requires that the file # partitions are in DFS. In order to facilitate this, the model DataFrame (saved as parquet) # will be copied to a temporary DFS location. The remaining files can be read directly from # the local file system path, which is handled within the Diviner APIs. import diviner dfs_temp_directory = generate_tmp_dfs_path(kwargs.get("dfs_tmpdir", MLFLOW_DFS_TMP.get())) dfs_fuse_directory = dbfs_hdfs_uri_to_fuse_path(dfs_temp_directory) os.makedirs(dfs_fuse_directory) shutil_copytree_without_file_permissions(src_dir=local_model_path, dst_dir=dfs_fuse_directory) diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY]) load_directory = os.path.join(dfs_fuse_directory, flavor_conf[_MODEL_BINARY_KEY]) return diviner_instance.load(load_directory)
[docs]def load_model(model_uri, dst_path=None, **kwargs): """Load a ``Diviner`` object from a local file or a run. Args: model_uri: The location, in URI format, of the MLflow model. For example: - ``/Users/me/path/to/local/model`` - ``relative/path/to/local/model`` - ``s3://my_bucket/path/to/model`` - ``runs:/<mlflow_run_id>/run-relative/path/to/model`` - ``mlflow-artifacts:/path/to/model`` For more information about supported URI schemes, see `Referencing Artifacts <https://www.mlflow.org/docs/latest/tracking.html# artifact-locations>`_. dst_path: The local filesystem path to which to download the model artifact. This directory must already exist if provided. If unspecified, a local output path will be created. kwargs: Optional configuration options for loading of a Diviner model. For models that have been fit and saved using Spark, if a specific DFS temporary directory is desired for loading of Diviner models, use the keyword argument `"dfs_tmpdir"` to define the loading temporary path for the model during loading. Returns: A ``Diviner`` model instance. """ model_uri = str(model_uri) flavor_conf = _get_flavor_configuration_from_uri(model_uri, FLAVOR_NAME, _logger) local_model_path = _download_artifact_from_uri(artifact_uri=model_uri, output_path=dst_path) _add_code_from_conf_to_system_path(local_model_path, flavor_conf) if flavor_conf.get(_SPARK_MODEL_INDICATOR, False): return _load_model_fit_in_spark(local_model_path, flavor_conf, **kwargs) return _load_model(local_model_path, flavor_conf)
def _load_model(path, flavor_conf): """ Loads a Diviner model instance that was not fit using Spark from a file system location. """ import diviner local_path = pathlib.Path(path) diviner_model_path = local_path.joinpath( flavor_conf.get(_MODEL_BINARY_KEY, _MODEL_BINARY_FILE_NAME) ) diviner_instance = getattr(diviner, flavor_conf[_MODEL_TYPE_KEY]) return diviner_instance.load(str(diviner_model_path)) def _load_pyfunc(path): local_path = pathlib.Path(path) # NB: reverting the dir walk that happens with pyfunc's loading implementation if local_path.is_file(): local_path = local_path.parent flavor_conf = _get_flavor_configuration(local_path, FLAVOR_NAME) if flavor_conf.get(_SPARK_MODEL_INDICATOR): raise MlflowException( "The model being loaded was fit in Spark. Diviner models fit in " "Spark do not support loading as pyfunc." ) return _DivinerModelWrapper(_load_model(local_path, flavor_conf))
[docs]@format_docstring(LOG_MODEL_PARAM_DOCS.format(package_name=FLAVOR_NAME)) def log_model( diviner_model, artifact_path, conda_env=None, code_paths=None, registered_model_name=None, signature: ModelSignature = None, input_example: ModelInputExample = None, await_registration_for=DEFAULT_AWAIT_MAX_SLEEP_SECONDS, pip_requirements=None, extra_pip_requirements=None, metadata=None, **kwargs, ): """Log a ``Diviner`` object as an MLflow artifact for the current run. Args: diviner_model: ``Diviner`` model that has been ``fit`` on a grouped temporal ``DataFrame``. artifact_path: Run-relative artifact path to save the model instance to. conda_env: {{ conda_env }} code_paths: {{ code_paths }} registered_model_name: This argument may change or be removed in a future release without warning. If given, create a model version under ``registered_model_name``, also creating a registered model if one with the given name does not exist. signature: :py:class:`Model Signature <mlflow.models.ModelSignature>` describes model input and output :py:class:`Schema <mlflow.types.Schema>`. The model signature can be :py:func:`inferred <mlflow.models.infer_signature>` from datasets with valid model input (e.g. the training dataset with target column omitted) and valid model output (e.g. model predictions generated on the training dataset), for example: .. code-block:: python :caption: Example from mlflow.models import infer_signature auto_arima_obj = AutoARIMA(out_of_sample_size=60, maxiter=100) base_auto_arima = GroupedPmdarima(model_template=auto_arima_obj).fit( df=training_data, group_key_columns=("region", "state"), y_col="y", datetime_col="ds", silence_warnings=True, ) predictions = model.predict(n_periods=30, alpha=0.05, return_conf_int=True) signature = infer_signature(data, predictions) input_example: {{ input_example }} await_registration_for: Number of seconds to wait for the model version to finish being created and is in ``READY`` status. By default, the function waits for five minutes. Specify 0 or None to skip waiting. pip_requirements: {{ pip_requirements }} extra_pip_requirements: {{ extra_pip_requirements }} metadata: {{ metadata }} kwargs: Additional arguments for :py:class:`mlflow.models.model.Model` Additionally, for models that have been fit in Spark, the following supported configuration options are available to set. Current supported options: - `partition_by` for setting a (or several) partition columns as a list of \ column names. Must be a list of strings of grouping key column(s). - `partition_count` for setting the number of part files to write from a \ repartition per `partition_by` group. The default part file count is 200. - `dfs_tmpdir` for specifying the DFS temporary location where the model will \ be stored while copying from a local file system to a Spark-supported "dbfs:/" \ scheme. Returns: A :py:class:`ModelInfo <mlflow.models.model.ModelInfo>` instance that contains the metadata of the logged model. """ return Model.log( artifact_path=artifact_path, flavor=mlflow.diviner, registered_model_name=registered_model_name, diviner_model=diviner_model, conda_env=conda_env, code_paths=code_paths, signature=signature, input_example=input_example, await_registration_for=await_registration_for, pip_requirements=pip_requirements, extra_pip_requirements=extra_pip_requirements, metadata=metadata, **kwargs, )
class _DivinerModelWrapper: def __init__(self, diviner_model): self.diviner_model = diviner_model def get_raw_model(self): """ Returns the underlying model. """ return self.diviner_model def predict(self, dataframe, params: Optional[Dict[str, Any]] = None) -> pd.DataFrame: """A method that allows a pyfunc implementation of this flavor to generate forecasted values from the end of a trained Diviner model's training series per group. The implementation here encapsulates a config-based switch of method calling. In short: * If the ``DataFrame`` supplied to this method contains a column ``groups`` whose first row of data is of type List[tuple[str]] (containing the series-identifying group keys that were generated to identify a single underlying model during training), the caller will resolve to the method ``predict_groups()`` in each of the underlying wrapped libraries (i.e., ``GroupedProphet.predict_groups()``). * If the ``DataFrame`` supplied does not contain the column name ``groups``, then the specific forecasting method that is primitive-driven (for ``GroupedProphet``, the ``predict()`` method mirrors that of ``Prophet``'s, requiring a ``DataFrame`` submitted with explicit datetime values per group which is not a tenable implementation for pyfunc or RESTful serving) is utilized. For ``GroupedProphet``, this is the ``.forecast()`` method, while for ``GroupedPmdarima``, this is the ``.predict()`` method. Args: dataframe: A ``pandas.DataFrame`` that contains the required configuration for the appropriate ``Diviner`` type. For example, for ``GroupedProphet.forecast()``: - horizon : int - frequency: str predict_conf = pd.DataFrame({"horizon": 30, "frequency": "D"}, index=[0]) forecast = pyfunc.load_pyfunc(model_uri=model_path).predict(predict_conf) Will generate 30 days of forecasted values for each group that the model was trained on. params: Additional parameters to pass to the model for inference. Returns: A Pandas DataFrame containing the forecasted values for each group key that was either trained or declared as a subset with a ``groups`` entry in the ``dataframe`` configuration argument. """ from diviner import GroupedPmdarima, GroupedProphet schema = dataframe.columns.values.tolist() conf = dataframe.to_dict(orient="index").get(0) # required parameter extraction and validation horizon = conf.get("horizon", None) n_periods = conf.get("n_periods", None) if n_periods and horizon and n_periods != horizon: raise MlflowException( "The provided prediction configuration contains both `n_periods` and `horizon` " "with different values. Please provide only one of these integer values.", error_code=INVALID_PARAMETER_VALUE, ) else: if not n_periods and horizon: n_periods = horizon if not n_periods: raise MlflowException( "The provided prediction configuration Pandas DataFrame does not contain either " "the `n_periods` or `horizon` columns. At least one of these must be specified " f"with a valid integer value. Configuration schema: {schema}.", error_code=INVALID_PARAMETER_VALUE, ) if not isinstance(n_periods, int): raise MlflowException( "The `n_periods` column contains invalid data. Supplied type must be an integer. " f"Type supplied: {type(n_periods)}", error_code=INVALID_PARAMETER_VALUE, ) frequency = conf.get("frequency", None) if isinstance(self.diviner_model, GroupedProphet) and not frequency: raise MlflowException( "Diviner's GroupedProphet model requires a `frequency` value to be submitted in " "Pandas date_range format. The submitted configuration Pandas DataFrame does not " f"contain a `frequency` column. Configuration schema: {schema}.", error_code=INVALID_PARAMETER_VALUE, ) predict_col = conf.get("predict_col", None) predict_groups = conf.get("groups", None) if predict_groups and not isinstance(predict_groups, List): raise MlflowException( "Specifying a group subset for prediction requires groups to be defined as a " f"[List[(Tuple|List)[<group_keys>]]. Submitted group type: {type(predict_groups)}.", error_code=INVALID_PARAMETER_VALUE, ) # NB: json serialization of a tuple converts the tuple to a List. Diviner requires a # List of Tuples to be input to the group_prediction API. This conversion is for utilizing # the pyfunc flavor through the serving API. if predict_groups and not isinstance(predict_groups[0], Tuple): predict_groups = [tuple(group) for group in predict_groups] if isinstance(self.diviner_model, GroupedProphet): # We're wrapping two different endpoints to Diviner here for the pyfunc implementation. # Since we're limited by a single endpoint, we can address redirecting to the # method ``predict_groups()`` which will allow for a subset of groups to be forecasted # if the prediction configuration DataFrame contains a List[tuple[str]]] in the # ``groups`` column. If this column is not present, all groups will be used to generate # forecasts, utilizing the less computationally complex method ``forecast``. if not predict_groups: prediction_df = self.diviner_model.forecast(horizon=n_periods, frequency=frequency) else: group_kwargs = {k: v for k, v in conf.items() if k in {"predict_col", "on_error"}} prediction_df = self.diviner_model.predict_groups( groups=predict_groups, horizon=n_periods, frequency=frequency, **group_kwargs ) if predict_col is not None: prediction_df.rename(columns={"yhat": predict_col}, inplace=True) elif isinstance(self.diviner_model, GroupedPmdarima): # As above, we're redirecting the prediction request to one of two different methods # for ``Diviner``'s pmdarima implementation. If the ``groups`` column is present with # a list of tuples of keys to lookup, ``predict_groups()`` will be used. Otherwise, # the standard ``predict()`` method will be called to generate forecasts for all groups # that were trained on. restricted_keys = {"n_periods", "horizon", "frequency", "groups"} predict_conf = {k: v for k, v in conf.items() if k not in restricted_keys} if not predict_groups: prediction_df = self.diviner_model.predict(n_periods=n_periods, **predict_conf) else: prediction_df = self.diviner_model.predict_groups( groups=predict_groups, n_periods=n_periods, **predict_conf ) else: raise MlflowException( f"The Diviner model instance type '{type(self.diviner_model)}' is not supported " f"in version {mlflow.__version__} of MLflow.", error_code=INVALID_PARAMETER_VALUE, ) return prediction_df