import abc
import logging
import os
from typing import List, Optional
from mlflow.exceptions import MlflowException
from mlflow.protos.databricks_pb2 import BAD_REQUEST, INTERNAL_ERROR, INVALID_PARAMETER_VALUE
from mlflow.recipes import dag_help_strings
from mlflow.recipes.artifacts import Artifact
from mlflow.recipes.step import BaseStep, StepClass, StepStatus
from mlflow.recipes.utils import (
get_recipe_config,
get_recipe_name,
get_recipe_root_path,
)
from mlflow.recipes.utils.execution import (
clean_execution_state,
get_or_create_base_execution_directory,
get_step_output_path,
run_recipe_step,
)
from mlflow.recipes.utils.step import display_html
from mlflow.utils.class_utils import _get_class_from_string
_logger = logging.getLogger(__name__)
[docs]class BaseRecipe:
"""
Base Recipe
"""
def __init__(self, recipe_root_path: str, profile: str) -> None:
"""
Recipe base class.
Args:
recipe_root_path: String path to the directory under which the recipe template
such as recipe.yaml, profiles/{profile}.yaml and steps/{step_name}.py are defined.
profile: String specifying the profile name, with which
{recipe_root_path}/profiles/{profile}.yaml is read and merged with
recipe.yaml to generate the configuration to run the recipe.
"""
self._recipe_root_path = recipe_root_path
self._run_args = {}
self._profile = profile
self._name = get_recipe_name(recipe_root_path)
# self._steps contains concatenated ordered lists of step objects representing multiple
# disjoint DAGs. To keep it in sync with the underlying config file, it should be reloaded
# from config files using self._resolve_recipe_steps() at the beginning of __init__(),
# run(), and inspect(), and should not reload it elsewhere.
self._steps = self._resolve_recipe_steps()
self._recipe = get_recipe_config(self._recipe_root_path, self._profile).get("recipe")
@property
def name(self) -> str:
"""Returns the name of the recipe."""
return self._name
@property
def profile(self) -> str:
"""
Returns the profile under which the recipe and its steps will execute.
"""
return self._profile
[docs] def run(self, step: Optional[str] = None) -> None:
"""
Runs a step in the recipe, or the entire recipe if a step is not specified.
Args:
step: String name to run a step within the recipe. The step and its dependencies
will be run sequentially. If a step is not specified, the entire recipe is
executed.
Returns:
None
"""
# TODO Record performance here.
self._steps = self._resolve_recipe_steps()
target_step = self._get_step(step) if step else self._get_default_step()
last_executed_step = run_recipe_step(
self._recipe_root_path,
self._get_subgraph_for_target_step(target_step),
target_step,
self._recipe,
)
self.inspect(last_executed_step.name)
# Verify that the step execution succeeded and throw if it didn't.
last_executed_step_output_directory = get_step_output_path(
self._recipe_root_path, last_executed_step.name, ""
)
last_executed_step_state = last_executed_step.get_execution_state(
last_executed_step_output_directory
)
if last_executed_step_state.status != StepStatus.SUCCEEDED:
last_step_error_mesg = (
f"The following error occurred while running step '{last_executed_step}':\n"
f"{last_executed_step_state.stack_trace}\n"
f"Last step status: '{last_executed_step_state.status}'\n"
)
if step is not None:
raise MlflowException(
f"Failed to run step '{step}' of recipe '{self.name}':\n{last_step_error_mesg}",
error_code=BAD_REQUEST,
)
else:
raise MlflowException(
f"Failed to run recipe '{self.name}':\n{last_step_error_mesg}",
error_code=BAD_REQUEST,
)
[docs] def inspect(self, step: Optional[str] = None) -> None:
"""
Displays main output from a step, or a recipe DAG if no step is specified.
Args:
step: String name to display a step output within the recipe. If a step is not
specified, the DAG of the recipe is shown instead.
Returns:
None
"""
self._steps = self._resolve_recipe_steps()
if not step:
display_html(html_file_path=self._get_recipe_dag_file())
else:
output_directory = get_step_output_path(self._recipe_root_path, step, "")
self._get_step(step).inspect(output_directory)
[docs] def clean(self, step: Optional[str] = None) -> None:
"""
Removes the outputs of the specified step from the cache, or removes the cached outputs
of all steps if no particular step is specified. After cached outputs are cleaned
for a particular step, the step will be re-executed in its entirety the next time it is
invoked via ``BaseRecipe.run()``.
Args:
step: String name of the step to clean within the recipe. If not specified,
cached outputs are removed for all recipe steps.
"""
to_clean = self._steps if not step else [self._get_step(step)]
clean_execution_state(self._recipe_root_path, to_clean)
def _get_step(self, step_name) -> BaseStep:
"""Returns a step class object from the recipe."""
steps = self._steps
step_names = [s.name for s in steps]
if step_name not in step_names:
raise MlflowException(
f"Step {step_name} not found in recipe. Available steps are {step_names}"
)
return self._steps[step_names.index(step_name)]
def _get_subgraph_for_target_step(self, target_step: BaseStep) -> List[BaseStep]:
"""
Return a list of step objects representing a connected DAG containing the target_step.
The returned list should be a sublist of self._steps.
"""
subgraph = []
if target_step.step_class == StepClass.UNKNOWN:
return subgraph
for step in self._steps:
if target_step.step_class() == step.step_class():
subgraph.append(step)
return subgraph
@abc.abstractmethod
def _get_default_step(self) -> BaseStep:
"""
Defines which step to run if no step is specified.
Concrete recipe class should implement this method.
"""
@abc.abstractmethod
def _get_step_classes(self):
"""
Returns a list of step classes defined in the recipe.
Concrete recipe class should implement this method.
"""
def _get_recipe_dag_file(self) -> str:
"""
Returns absolute path to the recipe DAG representation HTML file.
"""
import jinja2
j2_env = jinja2.Environment(loader=jinja2.FileSystemLoader(os.path.dirname(__file__)))
recipe_dag_template = j2_env.get_template("resources/recipe_dag_template.html").render(
{
"recipe_yaml_help": {
"help_string_type": "yaml",
"help_string": dag_help_strings.RECIPE_YAML,
},
"ingest_step_help": {
"help_string": dag_help_strings.INGEST_STEP,
"help_string_type": "text",
},
"ingest_user_code_help": {
"help_string": dag_help_strings.INGEST_USER_CODE,
"help_string_type": "python",
},
"ingested_data_help": {
"help_string": dag_help_strings.INGESTED_DATA,
"help_string_type": "text",
},
"split_step_help": {
"help_string": dag_help_strings.SPLIT_STEP,
"help_string_type": "text",
},
"split_user_code_help": {
"help_string": dag_help_strings.SPLIT_USER_CODE,
"help_string_type": "python",
},
"training_data_help": {
"help_string": dag_help_strings.TRAINING_DATA,
"help_string_type": "text",
},
"validation_data_help": {
"help_string": dag_help_strings.VALIDATION_DATA,
"help_string_type": "text",
},
"test_data_help": {
"help_string": dag_help_strings.TEST_DATA,
"help_string_type": "text",
},
"transform_step_help": {
"help_string": dag_help_strings.TRANSFORM_STEP,
"help_string_type": "text",
},
"transform_user_code_help": {
"help_string": dag_help_strings.TRANSFORM_USER_CODE,
"help_string_type": "python",
},
"fitted_transformer_help": {
"help_string": dag_help_strings.FITTED_TRANSFORMER,
"help_string_type": "text",
},
"transformed_training_and_validation_data_help": {
"help_string": dag_help_strings.TRANSFORMED_TRAINING_AND_VALIDATION_DATA,
"help_string_type": "text",
},
"train_step_help": {
"help_string": dag_help_strings.TRAIN_STEP,
"help_string_type": "text",
},
"train_user_code_help": {
"help_string": dag_help_strings.TRAIN_USER_CODE,
"help_string_type": "python",
},
"fitted_model_help": {
"help_string": dag_help_strings.FITTED_MODEL,
"help_string_type": "text",
},
"mlflow_run_help": {
"help_string": dag_help_strings.MLFLOW_RUN,
"help_string_type": "text",
},
"predicted_training_data_help": {
"help_string": dag_help_strings.PREDICTED_TRAINING_DATA,
"help_string_type": "text",
},
"custom_metrics_user_code_help": {
"help_string": dag_help_strings.CUSTOM_METRICS_USER_CODE,
"help_string_type": "python",
},
"evaluate_step_help": {
"help_string": dag_help_strings.EVALUATE_STEP,
"help_string_type": "text",
},
"model_validation_status_help": {
"help_string": dag_help_strings.MODEL_VALIDATION_STATUS,
"help_string_type": "text",
},
"register_step_help": {
"help_string": dag_help_strings.REGISTER_STEP,
"help_string_type": "text",
},
"registered_model_version_help": {
"help_string": dag_help_strings.REGISTERED_MODEL_VERSION,
"help_string_type": "text",
},
"ingest_scoring_step_help": {
"help_string": dag_help_strings.INGEST_SCORING_STEP,
"help_string_type": "text",
},
"ingested_scoring_data_help": {
"help_string": dag_help_strings.INGESTED_SCORING_DATA,
"help_string_type": "text",
},
"predict_step_help": {
"help_string": dag_help_strings.PREDICT_STEP,
"help_string_type": "text",
},
"scored_data_help": {
"help_string": dag_help_strings.SCORED_DATA,
"help_string_type": "text",
},
}
)
recipe_dag_file = os.path.join(
get_or_create_base_execution_directory(self._recipe_root_path), "recipe_dag.html"
)
with open(recipe_dag_file, "w") as f:
f.write(recipe_dag_template)
return recipe_dag_file
def _resolve_recipe_steps(self) -> List[BaseStep]:
"""
Constructs and returns all recipe step objects from the recipe configuration.
"""
recipe_config = get_recipe_config(self._recipe_root_path, self._profile)
recipe_config["profile"] = self.profile
return [
s.from_recipe_config(recipe_config, self._recipe_root_path)
for s in self._get_step_classes()
]
[docs] def get_artifact(self, artifact_name: str):
"""
Read an artifact from recipe output. artifact names can be obtained from
`Recipe.inspect()` or `Recipe.run()` output.
Returns None if the specified artifact is not found.
Raise an error if the artifact is not supported.
"""
return self._get_artifact(artifact_name).load()
def _get_artifact(self, artifact_name: str) -> Artifact:
"""
Read an Artifact object from recipe output. artifact names can be obtained
from `Recipe.inspect()` or `Recipe.run()` output.
Returns None if the specified artifact is not found.
Raise an error if the artifact is not supported.
"""
for step in self._steps:
for artifact in step.get_artifacts():
if artifact.name() == artifact_name:
return artifact
raise MlflowException(
f"The artifact with name '{artifact_name}' is not supported.",
error_code=INVALID_PARAMETER_VALUE,
)
[docs]class Recipe:
"""
A factory class that creates an instance of a recipe for a particular ML problem
(e.g. regression, classification) or MLOps task (e.g. batch scoring) based on the current
working directory and supplied configuration.
.. code-block:: python
:caption: Example
import os
from mlflow.recipes import Recipe
os.chdir("~/recipes-regression-template")
regression_recipe = Recipe(profile="local")
regression_recipe.run(step="train")
"""
[docs] def __new__(cls, profile: str):
"""
Creates an instance of an MLflow Recipe for a particular ML problem or MLOps task based
on the current working directory and supplied configuration. The current working directory
must be the root directory of an MLflow Recipe repository or a subdirectory of an
MLflow Recipe repository.
Args:
profile: The name of the profile to use for configuring the problem-specific or
task-specific recipe. Profiles customize the configuration of
one or more recipe steps, and recipe executions with different profiles
often produce different results.
Returns:
A recipe for a particular ML problem or MLOps task. For example, an instance of
`RegressionRecipe <https://github.com/mlflow/recipes-regression-template>`_
for regression problems.
.. code-block:: python
import os
from mlflow.recipes import Recipe
os.chdir("~/recipes-regression-template")
regression_recipe = Recipe(profile="local")
regression_recipe.run(step="train")
"""
if not profile:
raise MlflowException(
"A profile name must be provided to construct a valid Recipe object.",
error_code=INVALID_PARAMETER_VALUE,
) from None
recipe_root_path = get_recipe_root_path()
if " " in recipe_root_path:
raise MlflowException(
message=(
"Recipe directory path cannot contain spaces. Please move or rename your "
f"recipe directory. Current path: {recipe_root_path}"
),
error_code=INVALID_PARAMETER_VALUE,
) from None
recipe_config = get_recipe_config(recipe_root_path=recipe_root_path, profile=profile)
recipe = recipe_config.get("recipe")
if recipe is None:
raise MlflowException(
"The `recipe` property needs to be defined in the `recipe.yaml` file. "
"For example: `recipe: regression/v1`",
error_code=INVALID_PARAMETER_VALUE,
) from None
recipe_path = recipe.replace("/", ".").replace("@", ".")
class_name = f"mlflow.recipes.{recipe_path}.RecipeImpl"
try:
recipe_class_module = _get_class_from_string(class_name)
except Exception as e:
if isinstance(e, ModuleNotFoundError):
raise MlflowException(
f"Failed to find Recipe {class_name}."
f"Please check the correctness of the recipe template setting: {recipe}",
error_code=INVALID_PARAMETER_VALUE,
) from None
else:
raise MlflowException(
f"Failed to construct Recipe {class_name}. Error: {e!r}",
error_code=INTERNAL_ERROR,
) from None
recipe_name = get_recipe_name(recipe_root_path)
_logger.info(f"Creating MLflow Recipe '{recipe_name}' with profile: '{profile}'")
return recipe_class_module(recipe_root_path, profile)