Source code for mlflow.entities.assessment

from __future__ import annotations

import json
import time
from dataclasses import dataclass
from typing import Any, Optional, Union

from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.struct_pb2 import Value
from google.protobuf.timestamp_pb2 import Timestamp

from mlflow.entities._mlflow_object import _MlflowObject
from mlflow.entities.assessment_error import AssessmentError
from mlflow.entities.assessment_source import AssessmentSource, AssessmentSourceType  # noqa: F401
from mlflow.exceptions import MlflowException
from mlflow.protos.assessments_pb2 import Assessment as ProtoAssessment
from mlflow.protos.assessments_pb2 import Expectation as ProtoExpectation
from mlflow.protos.assessments_pb2 import Feedback as ProtoFeedback
from mlflow.utils.annotations import experimental

# Feedback value should be one of the following types:
# - float
# - int
# - str
# - bool
# - list of values of the same types as above
# - dict with string keys and values of the same types as above
PbValueType = Union[float, int, str, bool]
FeedbackValueType = Union[PbValueType, dict[str, PbValueType], list[PbValueType]]


[docs]@experimental @dataclass class Assessment(_MlflowObject): """ An abstraction for annotating a trace. An Assessment should be one of the following types: - Expectations: A label that represents the expected value for a particular operation. For example, an expected answer for a user question from a chatbot. - Feedback: A label that represents the feedback on the quality of the operation. Feedback can come from different sources, such as human judges, heuristic scorers, or LLM-as-a-Judge. You can log an assessment to a trace using the :py:func:`mlflow.log_expectation` or :py:func:`mlflow.log_feedback` functions. Args: name: The name of the assessment. source: The source of the assessment. trace_id: The ID of the trace associated with the assessment. If unset, the assessment is not associated with any trace yet. expectation: The expectation value of the assessment. feedback: The feedback value of the assessment. Only one of `expectation` or `feedback` should be specified. rationale: The rationale / justification for the assessment. metadata: The metadata associated with the assessment. span_id: The ID of the span associated with the assessment, if the assessment should be associated with a particular span in the trace. create_time_ms: The creation time of the assessment in milliseconds. If unset, the current time is used. last_update_time_ms: The last update time of the assessment in milliseconds. If unset, the current time is used. assessment_id: The ID of the assessment. This must be generated in the backend. """ name: str source: AssessmentSource # NB: The trace ID is optional because the assessment object itself may be created # standalone. For example, a custom metric function returns an assessment object # without a trace ID. That said, the trace ID is required when logging the # assessment to a trace in the backend eventually. # https://docs.databricks.com/aws/en/generative-ai/agent-evaluation/custom-metrics#-metric-decorator trace_id: Optional[str] = None expectation: Optional[Expectation] = None feedback: Optional[Feedback] = None rationale: Optional[str] = None metadata: Optional[dict[str, str]] = None span_id: Optional[str] = None create_time_ms: Optional[int] = None last_update_time_ms: Optional[int] = None # NB: The assessment ID should always be generated in the backend. The CreateAssessment # backend API asks for an incomplete Assessment object without an ID and returns a # complete one with assessment_id, so the ID is Optional in the constructor here. assessment_id: Optional[str] = None # Deprecated, use `error` in Feedback instead. Just kept for backward compatibility # and will be removed in the 3.0.0 release. error: Optional[AssessmentError] = None def __post_init__(self): if (self.expectation is not None) + (self.feedback is not None) != 1: raise MlflowException.invalid_parameter_value( "Exactly one of `expectation` or `feedback` should be specified.", ) # Populate the error field to the feedback object if self.error is not None: if self.expectation is not None: raise MlflowException.invalid_parameter_value( "Cannot set `error` when `expectation` is specified.", ) if self.feedback is None: raise MlflowException.invalid_parameter_value( "Cannot set `error` when `feedback` is not specified.", ) self.feedback.error = self.error # Set timestamp if not provided current_time = int(time.time() * 1000) # milliseconds if self.create_time_ms is None: self.create_time_ms = current_time if self.last_update_time_ms is None: self.last_update_time_ms = current_time
[docs] def to_proto(self): assessment = ProtoAssessment() assessment.assessment_name = self.name assessment.trace_id = self.trace_id assessment.source.CopyFrom(self.source.to_proto()) # Convert time in milliseconds to protobuf Timestamp assessment.create_time.FromMilliseconds(self.create_time_ms) assessment.last_update_time.FromMilliseconds(self.last_update_time_ms) if self.span_id is not None: assessment.span_id = self.span_id if self.rationale is not None: assessment.rationale = self.rationale if self.assessment_id is not None: assessment.assessment_id = self.assessment_id if self.expectation is not None: assessment.expectation.CopyFrom(self.expectation.to_proto()) elif self.feedback is not None: assessment.feedback.CopyFrom(self.feedback.to_proto()) if self.metadata: assessment.metadata.update(self.metadata) return assessment
[docs] @classmethod def from_proto(cls, proto): if proto.WhichOneof("value") == "expectation": expectation = Expectation.from_proto(proto.expectation) feedback = None elif proto.WhichOneof("value") == "feedback": expectation = None feedback = Feedback.from_proto(proto.feedback) else: expectation = None feedback = None # Convert ScalarMapContainer to a normal Python dict metadata = dict(proto.metadata) if proto.metadata else None return cls( assessment_id=proto.assessment_id or None, trace_id=proto.trace_id, name=proto.assessment_name, source=AssessmentSource.from_proto(proto.source), create_time_ms=proto.create_time.ToMilliseconds(), last_update_time_ms=proto.last_update_time.ToMilliseconds(), expectation=expectation, feedback=feedback, rationale=proto.rationale or None, metadata=metadata, span_id=proto.span_id or None, )
[docs] def to_dictionary(self): # Note that MessageToDict excludes None fields. For example, if assessment_id is None, # it won't be included in the resulting dictionary. return MessageToDict(self.to_proto(), preserving_proto_field_name=True)
[docs] @classmethod def from_dictionary(cls, d: dict[str, Any]) -> "Assessment": t = Timestamp() t.FromJsonString(d["create_time"]) create_time_ms = t.ToMilliseconds() t.FromJsonString(d["last_update_time"]) last_update_time_ms = t.ToMilliseconds() return cls( assessment_id=d.get("assessment_id"), trace_id=d.get("trace_id"), name=d["assessment_name"], source=AssessmentSource.from_dictionary(d["source"]), create_time_ms=create_time_ms, last_update_time_ms=last_update_time_ms, expectation=Expectation.from_dictionary(e) if (e := d.get("expectation")) else None, feedback=Feedback.from_dictionary(f) if (f := d.get("feedback")) else None, rationale=d.get("rationale"), metadata=d.get("metadata"), span_id=d.get("span_id"), )
_JSON_SERIALIZATION_FORMAT = "JSON_FORMAT" @experimental @dataclass class Expectation(_MlflowObject): """ Represents an expectation about the output of an operation, such as the expected response that a generative AI application should provide to a particular user query. Args: value: The expected value of the operation. This can be any JSON-serializable value. """ value: Any def to_proto(self): if self._need_serialization(): try: serialized_value = json.dumps(self.value) except Exception as e: raise MlflowException.invalid_parameter_value( f"Failed to serialize value {self.value} to JSON string. " "Expectation value must be JSON-serializable." ) from e return ProtoExpectation( serialized_value=ProtoExpectation.SerializedValue( serialization_format=_JSON_SERIALIZATION_FORMAT, value=serialized_value, ) ) return ProtoExpectation(value=ParseDict(self.value, Value())) @classmethod def from_proto(cls, proto) -> "Expectation": if proto.HasField("serialized_value"): if proto.serialized_value.serialization_format != _JSON_SERIALIZATION_FORMAT: raise MlflowException.invalid_parameter_value( f"Unknown serialization format: {proto.serialized_value.serialization_format}. " "Only JSON_FORMAT is supported." ) return cls(value=json.loads(proto.serialized_value.value)) else: return cls(value=MessageToDict(proto.value)) def to_dictionary(self): return {"value": self.value} @classmethod def from_dictionary(cls, d): return cls(d["value"]) def _need_serialization(self): # Values like None, lists, dicts, should be serialized as a JSON string return self.value is not None and not isinstance(self.value, (int, float, bool, str)) @experimental @dataclass class Feedback(_MlflowObject): """ Represents feedback about the output of an operation. For example, if the response from a generative AI application to a particular user query is correct, then a human or LLM judge may provide feedback with the value ``"correct"``. Args: value: The feedback value. This can be one of the following types: - float - int - str - bool - list of values of the same types as above - dict with string keys and values of the same types as above error: An optional error associated with the feedback. This is used to indicate that the feedback is not valid or cannot be processed. """ value: FeedbackValueType error: Optional[AssessmentError] = None def to_proto(self): return ProtoFeedback( value=ParseDict(self.value, Value(), ignore_unknown_fields=True), error=self.error.to_proto() if self.error else None, ) @classmethod def from_proto(cls, proto) -> "Feedback": return Feedback( value=MessageToDict(proto.value), error=AssessmentError.from_proto(proto.error) if proto.HasField("error") else None, ) def to_dictionary(self): return MessageToDict(self.to_proto(), preserving_proto_field_name=True) @classmethod def from_dictionary(cls, d): return cls( value=d["value"], error=AssessmentError.from_dictionary(err) if (err := d.get("error")) else None, )