from __future__ import annotations
import json
import time
from dataclasses import dataclass
from typing import Any, Optional, Union
from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.struct_pb2 import Value
from google.protobuf.timestamp_pb2 import Timestamp
from mlflow.entities._mlflow_object import _MlflowObject
from mlflow.entities.assessment_error import AssessmentError
from mlflow.entities.assessment_source import AssessmentSource, AssessmentSourceType # noqa: F401
from mlflow.exceptions import MlflowException
from mlflow.protos.assessments_pb2 import Assessment as ProtoAssessment
from mlflow.protos.assessments_pb2 import Expectation as ProtoExpectation
from mlflow.protos.assessments_pb2 import Feedback as ProtoFeedback
from mlflow.utils.annotations import experimental
# Feedback value should be one of the following types:
# - float
# - int
# - str
# - bool
# - list of values of the same types as above
# - dict with string keys and values of the same types as above
PbValueType = Union[float, int, str, bool]
FeedbackValueType = Union[PbValueType, dict[str, PbValueType], list[PbValueType]]
[docs]@experimental
@dataclass
class Assessment(_MlflowObject):
"""
An abstraction for annotating a trace. An Assessment should be one of the following types:
- Expectations: A label that represents the expected value for a particular operation.
For example, an expected answer for a user question from a chatbot.
- Feedback: A label that represents the feedback on the quality of the operation.
Feedback can come from different sources, such as human judges, heuristic scorers,
or LLM-as-a-Judge.
You can log an assessment to a trace using the :py:func:`mlflow.log_expectation` or
:py:func:`mlflow.log_feedback` functions.
Args:
name: The name of the assessment.
source: The source of the assessment.
trace_id: The ID of the trace associated with the assessment. If unset, the assessment
is not associated with any trace yet.
expectation: The expectation value of the assessment.
feedback: The feedback value of the assessment. Only one of `expectation` or `feedback`
should be specified.
rationale: The rationale / justification for the assessment.
metadata: The metadata associated with the assessment.
span_id: The ID of the span associated with the assessment, if the assessment should
be associated with a particular span in the trace.
create_time_ms: The creation time of the assessment in milliseconds. If unset, the
current time is used.
last_update_time_ms: The last update time of the assessment in milliseconds.
If unset, the current time is used.
assessment_id: The ID of the assessment. This must be generated in the backend.
"""
name: str
source: AssessmentSource
# NB: The trace ID is optional because the assessment object itself may be created
# standalone. For example, a custom metric function returns an assessment object
# without a trace ID. That said, the trace ID is required when logging the
# assessment to a trace in the backend eventually.
# https://docs.databricks.com/aws/en/generative-ai/agent-evaluation/custom-metrics#-metric-decorator
trace_id: Optional[str] = None
expectation: Optional[Expectation] = None
feedback: Optional[Feedback] = None
rationale: Optional[str] = None
metadata: Optional[dict[str, str]] = None
span_id: Optional[str] = None
create_time_ms: Optional[int] = None
last_update_time_ms: Optional[int] = None
# NB: The assessment ID should always be generated in the backend. The CreateAssessment
# backend API asks for an incomplete Assessment object without an ID and returns a
# complete one with assessment_id, so the ID is Optional in the constructor here.
assessment_id: Optional[str] = None
# Deprecated, use `error` in Feedback instead. Just kept for backward compatibility
# and will be removed in the 3.0.0 release.
error: Optional[AssessmentError] = None
def __post_init__(self):
if (self.expectation is not None) + (self.feedback is not None) != 1:
raise MlflowException.invalid_parameter_value(
"Exactly one of `expectation` or `feedback` should be specified.",
)
# Populate the error field to the feedback object
if self.error is not None:
if self.expectation is not None:
raise MlflowException.invalid_parameter_value(
"Cannot set `error` when `expectation` is specified.",
)
if self.feedback is None:
raise MlflowException.invalid_parameter_value(
"Cannot set `error` when `feedback` is not specified.",
)
self.feedback.error = self.error
# Set timestamp if not provided
current_time = int(time.time() * 1000) # milliseconds
if self.create_time_ms is None:
self.create_time_ms = current_time
if self.last_update_time_ms is None:
self.last_update_time_ms = current_time
[docs] def to_proto(self):
assessment = ProtoAssessment()
assessment.assessment_name = self.name
assessment.trace_id = self.trace_id
assessment.source.CopyFrom(self.source.to_proto())
# Convert time in milliseconds to protobuf Timestamp
assessment.create_time.FromMilliseconds(self.create_time_ms)
assessment.last_update_time.FromMilliseconds(self.last_update_time_ms)
if self.span_id is not None:
assessment.span_id = self.span_id
if self.rationale is not None:
assessment.rationale = self.rationale
if self.assessment_id is not None:
assessment.assessment_id = self.assessment_id
if self.expectation is not None:
assessment.expectation.CopyFrom(self.expectation.to_proto())
elif self.feedback is not None:
assessment.feedback.CopyFrom(self.feedback.to_proto())
if self.metadata:
assessment.metadata.update(self.metadata)
return assessment
[docs] @classmethod
def from_proto(cls, proto):
if proto.WhichOneof("value") == "expectation":
expectation = Expectation.from_proto(proto.expectation)
feedback = None
elif proto.WhichOneof("value") == "feedback":
expectation = None
feedback = Feedback.from_proto(proto.feedback)
else:
expectation = None
feedback = None
# Convert ScalarMapContainer to a normal Python dict
metadata = dict(proto.metadata) if proto.metadata else None
return cls(
assessment_id=proto.assessment_id or None,
trace_id=proto.trace_id,
name=proto.assessment_name,
source=AssessmentSource.from_proto(proto.source),
create_time_ms=proto.create_time.ToMilliseconds(),
last_update_time_ms=proto.last_update_time.ToMilliseconds(),
expectation=expectation,
feedback=feedback,
rationale=proto.rationale or None,
metadata=metadata,
span_id=proto.span_id or None,
)
[docs] def to_dictionary(self):
# Note that MessageToDict excludes None fields. For example, if assessment_id is None,
# it won't be included in the resulting dictionary.
return MessageToDict(self.to_proto(), preserving_proto_field_name=True)
[docs] @classmethod
def from_dictionary(cls, d: dict[str, Any]) -> "Assessment":
t = Timestamp()
t.FromJsonString(d["create_time"])
create_time_ms = t.ToMilliseconds()
t.FromJsonString(d["last_update_time"])
last_update_time_ms = t.ToMilliseconds()
return cls(
assessment_id=d.get("assessment_id"),
trace_id=d.get("trace_id"),
name=d["assessment_name"],
source=AssessmentSource.from_dictionary(d["source"]),
create_time_ms=create_time_ms,
last_update_time_ms=last_update_time_ms,
expectation=Expectation.from_dictionary(e) if (e := d.get("expectation")) else None,
feedback=Feedback.from_dictionary(f) if (f := d.get("feedback")) else None,
rationale=d.get("rationale"),
metadata=d.get("metadata"),
span_id=d.get("span_id"),
)
_JSON_SERIALIZATION_FORMAT = "JSON_FORMAT"
@experimental
@dataclass
class Expectation(_MlflowObject):
"""
Represents an expectation about the output of an operation, such as the expected response
that a generative AI application should provide to a particular user query.
Args:
value: The expected value of the operation. This can be any JSON-serializable value.
"""
value: Any
def to_proto(self):
if self._need_serialization():
try:
serialized_value = json.dumps(self.value)
except Exception as e:
raise MlflowException.invalid_parameter_value(
f"Failed to serialize value {self.value} to JSON string. "
"Expectation value must be JSON-serializable."
) from e
return ProtoExpectation(
serialized_value=ProtoExpectation.SerializedValue(
serialization_format=_JSON_SERIALIZATION_FORMAT,
value=serialized_value,
)
)
return ProtoExpectation(value=ParseDict(self.value, Value()))
@classmethod
def from_proto(cls, proto) -> "Expectation":
if proto.HasField("serialized_value"):
if proto.serialized_value.serialization_format != _JSON_SERIALIZATION_FORMAT:
raise MlflowException.invalid_parameter_value(
f"Unknown serialization format: {proto.serialized_value.serialization_format}. "
"Only JSON_FORMAT is supported."
)
return cls(value=json.loads(proto.serialized_value.value))
else:
return cls(value=MessageToDict(proto.value))
def to_dictionary(self):
return {"value": self.value}
@classmethod
def from_dictionary(cls, d):
return cls(d["value"])
def _need_serialization(self):
# Values like None, lists, dicts, should be serialized as a JSON string
return self.value is not None and not isinstance(self.value, (int, float, bool, str))
@experimental
@dataclass
class Feedback(_MlflowObject):
"""
Represents feedback about the output of an operation. For example, if the response from a
generative AI application to a particular user query is correct, then a human or LLM judge
may provide feedback with the value ``"correct"``.
Args:
value: The feedback value. This can be one of the following types:
- float
- int
- str
- bool
- list of values of the same types as above
- dict with string keys and values of the same types as above
error: An optional error associated with the feedback. This is used to indicate
that the feedback is not valid or cannot be processed.
"""
value: FeedbackValueType
error: Optional[AssessmentError] = None
def to_proto(self):
return ProtoFeedback(
value=ParseDict(self.value, Value(), ignore_unknown_fields=True),
error=self.error.to_proto() if self.error else None,
)
@classmethod
def from_proto(cls, proto) -> "Feedback":
return Feedback(
value=MessageToDict(proto.value),
error=AssessmentError.from_proto(proto.error) if proto.HasField("error") else None,
)
def to_dictionary(self):
return MessageToDict(self.to_proto(), preserving_proto_field_name=True)
@classmethod
def from_dictionary(cls, d):
return cls(
value=d["value"],
error=AssessmentError.from_dictionary(err) if (err := d.get("error")) else None,
)