ResponsesAgent for Model Serving
The ResponsesAgent class in MLflow provides a specialized interface for serving generative AI models that handle structured responses with tool calling capabilities. This agent is designed to work seamlessly with MLflow's serving infrastructure while providing compatibility with OpenAI-style APIs.
Overviewβ
The ResponsesAgent
extends MLflow's PyFunc model
interface to support conversational AI applications that require advanced capabilities such as
multi-turn dialogue, tool-calling, multi-agent orchestration, and compatibility with OpenAIβs
Responses API and MLflow model tracking.
- π¦ Structured request/response handling
- π οΈ Tool calling and function execution
- π¬ Chat history management
- π Token usage tracking
- π€ OpenAI API compatibility
- π Support for returning multiple output messages, including intermediate outputs from tool-calling
- π₯ Support for multi-agent scenarios
- π Compatibility with MLflow logging, tracing, and model serving
- π Ensure compatibility with OpenAI Responses API for seamless integration with downstream clients and UIs
This makes it ideal for building and deploying chatbots, virtual assistants, and other conversational AI applications.
Key Featuresβ
Structured Response Handlingβ
The ResponsesAgent processes structured inputs and outputs that conform to chat completion standards:
- Messages: Handles conversation history with role-based messages (system, user, assistant)
- Tool Calls: Supports function calling with structured parameters
- Usage Tracking: Monitors token consumption and model performance
- Metadata: Captures additional context and configuration
OpenAI API Compatibilityβ
ResponsesAgent is designed with full OpenAI API compatibility in mind, allowing seamless integration with existing applications and tools built for OpenAI's chat completions API. This compatibility extends across request formats, response structures, and API endpoints.
MLflow Integrationβ
Full integration with MLflow's ecosystem including:
- π Model tracking and versioning
- π§ͺ Experiment management
- ποΈ Model registry
- π Deployment options
Basic Usageβ
Implementing a ResponsesAgentβ
Getting startedβ
To create your own agent, subclass mlflow.pyfunc.ResponsesAgent
and implement your agent logic in the predict
method. The implementation is framework-agnostic, allowing you to use any agent authoring framework. Note that pydantic>=2
is required to use ResponsesAgent. For example implementations, see the simple chat agent and the tool calling agent below.
Creating agent outputβ
When implementing your agent, you'll work with two main output types: ResponsesAgentResponse
and ResponsesAgentStreamEvent
. These are the only pydantic objects you should create directly. The remaining classes in mlflow.types.responses_helpers
are only for validating dictionaries.
If you want to return outputs that don't fit into the standard interface, you can use the custom_outputs
field.
Below are some helper methods you can use to create common outputs within the ResponsesAgent interface:
-
mlflow.pyfunc.ResponsesAgent.create_text_output_item()
-
mlflow.pyfunc.ResponsesAgent.create_function_call_item()
-
mlflow.pyfunc.ResponsesAgent.create_function_call_output_item()
mlflow.pyfunc.ResponsesAgent.create_text_delta()
(only for streaming)
Here's an example of a complete tool calling sequence using ResponsesAgentResponse
with a custom output:
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse
class SimpleResponsesAgent(ResponsesAgent):
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
return ResponsesAgentResponse(
output=[
self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
],
custom_outputs={"key1": "custom-value1"},
)
Streaming agent outputβ
For real-time processing, you can use streaming events instead of returning a complete response. Streaming allows you to send partial results as they become available, which is useful for long-running operations or when you want to show progress to users.
Basic text streamingβ
To stream text within the ResponsesAgent interface, you should:
- yield
response.output_text.delta
events with the chunks as they become available- it must have an
item_id
that corresponds related events to a single output item
- it must have an
- yield a
response.output_item.done
event to aggregate all chunks
from mlflow.types.responses import ResponsesAgentStreamEvent
class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
# stream text, all with the same item_id
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="Hello", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="world", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="!", item_id="msg_1"),
)
# the text output item id should be the same
# item_id as the streamed text deltas
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="Hello world!",
id="msg_1",
),
)
Tool calling with streamingβ
You can also stream tool calls and their results. Each tool call and its output are sent as separate response.output_item.done
events. This enables MLflow tracing and makes it easier for clients to reconstruct streamed message history.
from mlflow.types.responses import ResponsesAgentStreamEvent
class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
)
Logging and Servingβ
Log your agent using the Models-from-code approach. This approach is framework-agnostic and supports all authoring frameworks:
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
python_model="agent.py", # replace with your relative path to agent code
name="agent",
)
For ease of use, MLflow has built in the following features:
- Automatic model signature inference
- An input and output signature that adheres to the ResponsesAgentRequest and ResponsesAgentResponse schemas will be set
- Metadata
{"task": "agent/v1/responses"}
will be automatically appended to any metadata that you may pass in when logging the model
- Input Example
- Providing an input example is optional,
mlflow.types.responses.RESPONSES_AGENT_INPUT_EXAMPLE
will be used by default - If you do provide an input example, ensure it's a dictionary of the ResponsesAgentRequest schema
- Providing an input example is optional,
To test out a ResponsesAgent, you can pass a single input dictionary that follows the ResponsesAgentRequest schema both before and after logging it:
import mlflow
# load it back from mlflow
loaded_model = mlflow.pyfunc.load_model(logged_agent_info.model_uri)
loaded_model.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)
To serve the model, refer to the serving options
for detailed instructions.
Examplesβ
Simple Chat Exampleβ
Here's an example of an agent that calls OpenAI's gpt-4o model with a simple tool:
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import os
from typing import Generator
import mlflow
from mlflow.entities.span import SpanType
from mlflow.models import set_model
from mlflow.pyfunc.model import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
class SimpleResponsesAgent(ResponsesAgent):
def __init__(self, model: str):
self.client = OpenAI()
self.model = model
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
response = self.client.responses.create(input=request.input, model=self.model)
return ResponsesAgentResponse(**response.to_dict())
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for event in self.client.responses.create(
input=request.input, stream=True, model=self.model
):
yield ResponsesAgentStreamEvent(**event.to_dict())
mlflow.openai.autolog()
agent = SimpleResponsesAgent(model="gpt-4o")
set_model(agent)
Tool Calling Exampleβ
Here's an example of an agent that calls OpenAI's gpt-4o model with a simple tool:
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import json
from typing import Any, Callable, Generator
import os
from uuid import uuid4
import backoff
import mlflow
import openai
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
from pydantic import BaseModel
class ToolInfo(BaseModel):
"""
Class representing a tool for the agent.
- "name" (str): The name of the tool.
- "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
- "exec_fn" (Callable): Function that implements the tool logic
"""
name: str
spec: dict
exec_fn: Callable
class ToolCallingAgent(ResponsesAgent):
"""
Class representing a tool-calling Agent
"""
def __init__(self, model: str, tools: list[ToolInfo]):
"""Initializes the ToolCallingAgent with tools."""
self.model = model
self.client: OpenAI = OpenAI()
self._tools_dict = {tool.name: tool for tool in tools}
def get_tool_specs(self) -> list[dict]:
"""Returns tool specifications in the format OpenAI expects."""
return [tool_info.spec for tool_info in self._tools_dict.values()]
@mlflow.trace(span_type=SpanType.TOOL)
def execute_tool(self, tool_name: str, args: dict) -> Any:
"""Executes the specified tool with the given arguments."""
return self._tools_dict[tool_name].exec_fn(**args)
@backoff.on_exception(backoff.expo, openai.RateLimitError)
@mlflow.trace(span_type=SpanType.LLM)
def call_llm(self, input_messages) -> ResponsesAgentStreamEvent:
return (
self.client.responses.create(
model=self.model,
input=input_messages,
tools=self.get_tool_specs(),
)
.output[0]
.model_dump(exclude_none=True)
)
def handle_tool_call(self, tool_call: dict[str, Any]) -> ResponsesAgentStreamEvent:
"""
Execute tool calls and return a ResponsesAgentStreamEvent w/ tool output
"""
args = json.loads(tool_call["arguments"])
result = str(self.execute_tool(tool_name=tool_call["name"], args=args))
tool_call_output = {
"type": "function_call_output",
"call_id": tool_call["call_id"],
"output": result,
}
return ResponsesAgentStreamEvent(
type="response.output_item.done", item=tool_call_output
)
def call_and_run_tools(
self,
input_messages,
max_iter: int = 10,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for _ in range(max_iter):
last_msg = input_messages[-1]
if (
last_msg.get("type", None) == "message"
and last_msg.get("role", None) == "assistant"
):
return
if last_msg.get("type", None) == "function_call":
tool_call_res = self.handle_tool_call(last_msg)
input_messages.append(tool_call_res.item)
yield tool_call_res
else:
llm_output = self.call_llm(input_messages=input_messages)
input_messages.append(llm_output)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=llm_output,
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item={
"id": str(uuid4()),
"content": [
{
"type": "output_text",
"text": "Max iterations reached. Stopping.",
}
],
"role": "assistant",
"type": "message",
},
)
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
input_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + [
i.model_dump() for i in request.input
]
yield from self.call_and_run_tools(input_messages=input_messages)
tools = [
ToolInfo(
name="get_weather",
spec={
"type": "function",
"name": "get_weather",
"description": "Get current temperature for provided coordinates in celsius.",
"parameters": {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
},
"required": ["latitude", "longitude"],
"additionalProperties": False,
},
"strict": True,
},
exec_fn=lambda latitude, longitude: 70, # dummy tool implementation
)
]
os.environ["OPENAI_API_KEY"] = "your OpenAI API key"
SYSTEM_PROMPT = "You are a helpful assistant that can call tools to get information."
mlflow.openai.autolog()
AGENT = ToolCallingAgent(model="gpt-4o", tools=tools)
mlflow.models.set_model(AGENT)
Serving Optionsβ
- Local Serving
- Docker Deployment
- Databricks Model Serving
Start a local server for development and testing:
mlflow models serve -m models:/<model_id> -p 5000
Test the served model:
import requests
response = requests.post(
"http://localhost:5000/invocations",
json={"messages": [{"role": "user", "content": "What's the weather like?"}]},
)
print(response.json())
mlflow models build-docker -m runs:/<run_id>/responses_agent -n my-responses-agent
docker run -p 5000:8080 my-responses-agent
Endpoint creation and management features are available in Databricks' managed MLflow service but not in open-source MLflow.
Prerequisite: register the logged model into Databricks-UC
import mlflow
mlflow.register_model("model:/<model_id>", "<catalog>.<schema>.<model_name>")
For managed MLflow on Databricks, ResponsesAgent models integrate seamlessly with Databricks Model Serving:
from mlflow.deployments import get_deploy_client
client = get_deploy_client("databricks")
endpoint = client.create_endpoint(
name="unity-catalog-model-endpoint",
config={
"served_entities": [
{
"name": "ads-entity",
"entity_name": "catalog.schema.my-ads-model",
"entity_version": "3",
"workload_size": "Small",
"scale_to_zero_enabled": True,
}
],
"traffic_config": {
"routes": [
{"served_model_name": "my-ads-model-3", "traffic_percentage": 100}
]
},
},
)
Schema and Typesβ
ResponsesAgent works with structured schemas for requests and responses, check ResponsesAgentRequest and ResponsesAgentResponse for detailed structure.
# Example Request schema
{
"input": [
{
"role": "user",
"content": "What is the weather like in Boston today?",
}
],
"tools": [
{
"type": "function",
"name": "get_current_weather",
"parameters": {
"type": "object",
"properties": {"location": {"type": "string"}},
"required": ["location", "unit"],
},
}
],
}
# Example Response schema
{
"output": [
{
"type": "message",
"id": "some-id",
"status": "completed",
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "rainy",
}
],
}
],
}
Troubleshootingβ
Common Issuesβ
- Import Errors: Ensure all dependencies are included in the conda environment
- Schema Validation: Verify input/output formats match expected schemas
- Model Logging Issues: User
models from code
feature to log the model - Missing traces: Enable tracing by
mlflow.<flavor>.autolog
or manual tracing withmlflow.start_span
Debuggingβ
Enable tracing for troubleshooting, if your model is traced:
import mlflow
# enable autologging if your agent internally uses MLflow tracing-supported libraries internally, such as LangChain, OpenAI, etc.
# mlflow.<flavor>.autolog()
# Test your agent locally before serving
agent = MyResponsesAgent()
agent.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)