ResponsesAgent Introduction
What is a ResponsesAgent?​
ResponsesAgent
is a subclass of PythonModel
that provides a framework-agnostic way to create an agent model. Authoring an agent using ResponsesAgent
provides the following benefits:
- Support for returning multiple output messages, including intermediate outputs from tool-calling
- Support for multi-agent scenarios
- Ensure compatibility with MLflow logging, tracing, and model serving
- Ensure your model is compatible with OpenAI Responses API, making it compatible with OpenAI's responses client and other downstream UIs/applications
We recommend ResponsesAgent
instead of ChatModel
and ChatAgent
, as it has all the benefits of ChatAgent
and and supports additional features like annotations.
Author a ResponsesAgent​
Getting started​
To create your own agent, subclass mlflow.pyfunc.ResponsesAgent
and implement your agent logic in the predict
method. The implementation is framework-agnostic, allowing you to use any agent authoring framework. Note that pydantic>=2
is required to use ResponsesAgent. For example implementations, see the simple chat agent and the tool calling agent below.
Creating agent output​
When implementing your agent, you'll work with two main output types: ResponsesAgentResponse
and ResponsesAgentStreamEvent
. These are the only pydantic objects you should create directly. The remaining classes in mlflow.types.responses_helpers
are only for validating dictionaries.
If you want to return outputs that don't fit into the standard interface, you can use the custom_outputs
field.
Below are some helper methods you can use to create common outputs within the ResponsesAgent interface:
-
mlflow.pyfunc.ResponsesAgent.create_text_output_item()
-
mlflow.pyfunc.ResponsesAgent.create_function_call_item()
-
mlflow.pyfunc.ResponsesAgent.create_function_call_output_item()
mlflow.pyfunc.ResponsesAgent.create_text_delta()
(only for streaming)
Here's an example of a complete tool calling sequence using ResponsesAgentResponse
with a custom output:
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import ResponsesAgentRequest, ResponsesAgentResponse
class SimpleResponsesAgent(ResponsesAgent):
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
return ResponsesAgentResponse(
output=[
self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
],
custom_outputs={"key1": "custom-value1"},
)
Streaming agent output​
For real-time processing, you can use streaming events instead of returning a complete response. Streaming allows you to send partial results as they become available, which is useful for long-running operations or when you want to show progress to users.
Basic text streaming​
To stream text within the ResponsesAgent interface, you should:
- yield
response.output_text.delta
events with the chunks as they become available- it must have an
item_id
that corresponds related events to a single output item
- it must have an
- yield a
response.output_item.done
event to aggregate all chunks
from mlflow.types.responses import ResponsesAgentStreamEvent
class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
# stream text, all with the same item_id
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="Hello", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="world", item_id="msg_1"),
)
yield ResponsesAgentStreamEvent(
**self.create_text_delta(delta="!", item_id="msg_1"),
)
# the text output item id should be the same
# item_id as the streamed text deltas
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="Hello world!",
id="msg_1",
),
)
Tool calling with streaming​
You can also stream tool calls and their results. Each tool call and its output are sent as separate response.output_item.done
events. This enables MLflow tracing and makes it easier for clients to reconstruct streamed message history.
from mlflow.types.responses import ResponsesAgentStreamEvent
class SimpleResponsesAgent(ResponsesAgent):
# ... continuing from above
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_item(
id="fc_1",
call_id="call_1",
name="python_exec",
arguments='{"code":"result = 4 * 3\\nprint(result)"}',
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_function_call_output_item(
call_id="call_1",
output="12\n",
),
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=self.create_text_output_item(
text="The result of 4 * 3 in Python is 12.",
id="msg_1",
),
)
Log your agent​
Log your agent using the Models-from-code approach. This approach is framework-agnostic and supports all authoring frameworks:
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
python_model="agent.py", # replace with your relative path to agent code
name="agent",
)
For ease of use, MLflow has built in the following features:
- Automatic model signature inference
- An input and output signature that adheres to the ResponsesAgentRequest and ResponsesAgentResponse schemas will be set
- Metadata
{"task": "agent/v1/responses"}
will be automatically appended to any metadata that you may pass in when logging the model
- Input Example
- Providing an input example is optional,
mlflow.types.responses.RESPONSES_AGENT_INPUT_EXAMPLE
will be used by default - If you do provide an input example, ensure it's a dictionary of the ResponsesAgentRequest schema
- Providing an input example is optional,
Testing out your agent​
To test out a ResponsesAgent, you can pass a single input dictionary that follows the ResponsesAgentRequest schema both before and after logging it:
from mlflow.pyfunc import ResponsesAgent
class MyResponsesAgent(ResponsesAgent):
...
responses_agent = MyResponsesAgent()
responses_agent.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)
# ... log responses_agent using code from above
# load it back from mlflow
loaded_model = mlflow.pyfunc.load_model(path)
loaded_model.predict(
{
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
}
)
Migrating from ChatAgent
​
When migrating from ChatAgent
to ResponsesAgent
, the primary task involves adapting your message formats from the ChatCompletion API to the Responses API schema. For detailed information about these changes, refer to the OpenAI documentation:
If the LLM provider you are using uses chat completions, you can modify the below helper function to convert the output from your ResponsesAgent into chat completions messages to support multi-turn agent chat:
def convert_to_chat_completion_format(message: dict[str, Any]) -> list[dict[str, Any]]:
"""Convert from ResponsesAgent output to a ChatCompletions compatible list of messages"""
msg_type = message.get("type", None)
if msg_type == "function_call":
return [
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": message["call_id"],
"type": "function",
"function": {
"arguments": message["arguments"],
"name": message["name"],
},
}
],
}
]
elif msg_type == "message" and isinstance(message["content"], list):
return [
{"role": message["role"], "content": content["text"]}
for content in message["content"]
]
elif msg_type == "function_call_output":
return [
{
"role": "tool",
"content": message["output"],
"tool_call_id": message["call_id"],
}
]
compatible_keys = ["role", "content", "name", "tool_calls", "tool_call_id"]
return [{k: v for k, v in message.items() if k in compatible_keys}]
The ResponsesAgent
interface extends all functionality previously available in ChatAgent
, while introducing new features. Below, we outline the key differences in message representation between the two interfaces for common use cases:
Standard Text Response​
ResponsesAgent​
{
"type": "message",
"id": "",
"content": [
{
"annotations": [],
"text": "",
"type": "output_text"
}
],
"role": "assistant",
"status": "completed"
}
ChatAgent​
{
"role": "assistant",
"content": ""
}
Tool Calls​
ResponsesAgent​
{
"type": "function_call",
"id": "fc_1",
"arguments": "",
"call_id": "call_1",
"name": "",
"status": "completed"
}
ChatAgent​
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "",
"arguments": ""
}
}
]
}
Tool Call Results​
ResponsesAgent​
{
"type": "function_call_output",
"call_id": "call_1",
"output": ""
}
ChatAgent​
{
"role": "tool",
"content": "12",
"tool_call_id": "call_1"
}
Tool Definitions​
ResponsesAgent​
{
"name": "",
"parameters": {},
"strict": true,
"type": "function",
"description": ""
}
ChatAgent​
{
"type": "function",
"function": {
"name": "",
"description": "",
"parameters": {},
"strict": true
}
}
Simple Chat Example​
Here's an example of an agent that calls OpenAI's gpt-4o model with a simple tool:
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import os
from typing import Generator
import mlflow
from mlflow.entities.span import SpanType
from mlflow.models import set_model
from mlflow.pyfunc.model import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
class SimpleResponsesAgent(ResponsesAgent):
def __init__(self, model: str):
self.client = OpenAI()
self.model = model
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
response = self.client.responses.create(input=request.input, model=self.model)
return ResponsesAgentResponse(**response.to_dict())
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for event in self.client.responses.create(
input=request.input, stream=True, model=self.model
):
yield ResponsesAgentStreamEvent(**event.to_dict())
mlflow.openai.autolog()
agent = SimpleResponsesAgent(model="gpt-4o")
set_model(agent)
Tool Calling Example​
Here's an example of an agent that calls OpenAI's gpt-4o model with a simple tool:
# uncomment below if running inside a jupyter notebook
# %%writefile agent.py
import json
from typing import Any, Callable, Generator
import os
from uuid import uuid4
import backoff
import mlflow
import openai
from mlflow.entities import SpanType
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from openai import OpenAI
from pydantic import BaseModel
class ToolInfo(BaseModel):
"""
Class representing a tool for the agent.
- "name" (str): The name of the tool.
- "spec" (dict): JSON description of the tool (matches OpenAI Responses format)
- "exec_fn" (Callable): Function that implements the tool logic
"""
name: str
spec: dict
exec_fn: Callable
class ToolCallingAgent(ResponsesAgent):
"""
Class representing a tool-calling Agent
"""
def __init__(self, model: str, tools: list[ToolInfo]):
"""Initializes the ToolCallingAgent with tools."""
self.model = model
self.client: OpenAI = OpenAI()
self._tools_dict = {tool.name: tool for tool in tools}
def get_tool_specs(self) -> list[dict]:
"""Returns tool specifications in the format OpenAI expects."""
return [tool_info.spec for tool_info in self._tools_dict.values()]
@mlflow.trace(span_type=SpanType.TOOL)
def execute_tool(self, tool_name: str, args: dict) -> Any:
"""Executes the specified tool with the given arguments."""
return self._tools_dict[tool_name].exec_fn(**args)
@backoff.on_exception(backoff.expo, openai.RateLimitError)
@mlflow.trace(span_type=SpanType.LLM)
def call_llm(self, input_messages) -> ResponsesAgentStreamEvent:
return (
self.client.responses.create(
model=self.model,
input=input_messages,
tools=self.get_tool_specs(),
)
.output[0]
.model_dump(exclude_none=True)
)
def handle_tool_call(self, tool_call: dict[str, Any]) -> ResponsesAgentStreamEvent:
"""
Execute tool calls and return a ResponsesAgentStreamEvent w/ tool output
"""
args = json.loads(tool_call["arguments"])
result = str(self.execute_tool(tool_name=tool_call["name"], args=args))
tool_call_output = {
"type": "function_call_output",
"call_id": tool_call["call_id"],
"output": result,
}
return ResponsesAgentStreamEvent(
type="response.output_item.done", item=tool_call_output
)
def call_and_run_tools(
self,
input_messages,
max_iter: int = 10,
) -> Generator[ResponsesAgentStreamEvent, None, None]:
for _ in range(max_iter):
last_msg = input_messages[-1]
if (
last_msg.get("type", None) == "message"
and last_msg.get("role", None) == "assistant"
):
return
if last_msg.get("type", None) == "function_call":
tool_call_res = self.handle_tool_call(last_msg)
input_messages.append(tool_call_res.item)
yield tool_call_res
else:
llm_output = self.call_llm(input_messages=input_messages)
input_messages.append(llm_output)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item=llm_output,
)
yield ResponsesAgentStreamEvent(
type="response.output_item.done",
item={
"id": str(uuid4()),
"content": [
{
"type": "output_text",
"text": "Max iterations reached. Stopping.",
}
],
"role": "assistant",
"type": "message",
},
)
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
outputs = [
event.item
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
return ResponsesAgentResponse(
output=outputs, custom_outputs=request.custom_inputs
)
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
input_messages = [{"role": "system", "content": SYSTEM_PROMPT}] + [
i.model_dump() for i in request.input
]
yield from self.call_and_run_tools(input_messages=input_messages)
tools = [
ToolInfo(
name="get_weather",
spec={
"type": "function",
"name": "get_weather",
"description": "Get current temperature for provided coordinates in celsius.",
"parameters": {
"type": "object",
"properties": {
"latitude": {"type": "number"},
"longitude": {"type": "number"},
},
"required": ["latitude", "longitude"],
"additionalProperties": False,
},
"strict": True,
},
exec_fn=lambda latitude, longitude: 70, # dummy tool implementation
)
]
os.environ["OPENAI_API_KEY"] = "your OpenAI API key"
SYSTEM_PROMPT = "You are a helpful assistant that can call tools to get information."
mlflow.openai.autolog()
AGENT = ToolCallingAgent(model="gpt-4o", tools=tools)
mlflow.models.set_model(AGENT)