StreamContentAccumulator

class StreamContentAccumulator:
Manages content accumulation across streaming responses to ensure all responses contain complete cumulative content.

init

def __init__(self):

set_base_content

def set_base_content(self, content: str):
Set the base content (usually empty or pre-tool content).

add_streaming_content

def add_streaming_content(self, new_content: str):
Add new streaming content.

add_tool_status

def add_tool_status(self, status_message: str):
Add a tool status message.

get_full_content

def get_full_content(self):
Get the complete accumulated content.

get_content_with_new_status

def get_content_with_new_status(self, status_message: str):
Get content with a new status message appended.

reset_streaming_content

def reset_streaming_content(self):
Reset only the streaming content, keep base and tool status.

StreamingChatAgentResponse

class StreamingChatAgentResponse:
A wrapper that makes streaming responses compatible with non-streaming code. This class wraps a Generator[ChatAgentResponse, None, None] and provides the same interface as ChatAgentResponse, so existing code doesn’t need to change.

init

def __init__(self, generator: Generator[ChatAgentResponse, None, None]):

_ensure_latest_response

def _ensure_latest_response(self):
Ensure we have the latest response by consuming the generator.

msgs

def msgs(self):
Get messages from the latest response.

terminated

def terminated(self):
Get terminated status from the latest response.

info

def info(self):
Get info from the latest response.

msg

def msg(self):
Get the single message if there’s exactly one message.

iter

def __iter__(self):
Make this object iterable.

getattr

def __getattr__(self, name):
Forward any other attribute access to the latest response.

AsyncStreamingChatAgentResponse

class AsyncStreamingChatAgentResponse:
A wrapper that makes async streaming responses awaitable and compatible with non-streaming code. This class wraps an AsyncGenerator[ChatAgentResponse, None] and provides both awaitable and async iterable interfaces.

init

def __init__(self, async_generator: AsyncGenerator[ChatAgentResponse, None]):

await

def __await__(self):
Make this object awaitable - returns the final response.

aiter

def __aiter__(self):
Make this object async iterable.

ChatAgent

class ChatAgent(BaseAgent):
Class for managing conversations of CAMEL Chat Agents. Parameters:
  • system_message (Union[BaseMessage, str], optional): The system message for the chat agent. (default: :obj:None) model (Union[BaseModelBackend, Tuple[str, str], str, ModelType, Tuple[ModelPlatformType, ModelType], List[BaseModelBackend], List[str], List[ModelType], List[Tuple[str, str]], List[Tuple[ModelPlatformType, ModelType]]], optional): The model backend(s) to use. Can be a single instance, a specification (string, enum, tuple), or a list of instances or specifications to be managed by ModelManager. If a list of specifications (not BaseModelBackend instances) is provided, they will be instantiated using ModelFactory. (default: :obj:ModelPlatformType.DEFAULT with ModelType.DEFAULT)
  • memory (AgentMemory, optional): The agent memory for managing chat messages. If None, a :obj:ChatHistoryMemory will be used. (default: :obj:None)
  • message_window_size (int, optional): The maximum number of previous messages to include in the context window. If None, no windowing is performed. (default: :obj:None)
  • token_limit (int, optional): The maximum number of tokens in a context. The context will be automatically pruned to fulfill the limitation. If None, it will be set according to the backend model. (default: :obj:None)
  • output_language (str, optional): The language to be output by the agent. (default: :obj:None)
  • tools (Optional[List[Union[FunctionTool, Callable]]], optional): List of available :obj:FunctionTool or :obj:Callable. (default: :obj:None) toolkits_to_register_agent (Optional[List[RegisteredAgentToolkit]], optional): List of toolkit instances that inherit from :obj:RegisteredAgentToolkit. The agent will register itself with these toolkits, allowing them to access the agent instance. Note: This does NOT add the toolkit’s tools to the agent. To use tools from these toolkits, pass them explicitly via the tools parameter. (default: :obj:None) external_tools (Optional[List[Union[FunctionTool, Callable, Dict[str, Any]]]], optional): List of external tools (:obj:FunctionTool or :obj:Callable or :obj:Dict[str, Any]) bind to one chat agent. When these tools are called, the agent will directly return the request instead of processing it. (default: :obj:None)
  • response_terminators (List[ResponseTerminator], optional): List of :obj:ResponseTerminator bind to one chat agent. (default: :obj:None)
  • scheduling_strategy (str): name of function that defines how to select the next model in ModelManager. (default: :str:round_robin)
  • max_iteration (Optional[int], optional): Maximum number of model calling iterations allowed per step. If None (default), there’s no explicit limit. If 1, it performs a single model call. If N > 1, it allows up to N model calls. (default: :obj:None)
  • agent_id (str, optional): The ID of the agent. If not provided, a random UUID will be generated. (default: :obj:None)
  • stop_event (Optional[threading.Event], optional): Event to signal termination of the agent’s operation. When set, the agent will terminate its execution. (default: :obj:None)
  • tool_execution_timeout (Optional[float], optional): Timeout for individual tool execution. If None, wait indefinitely.
  • mask_tool_output (Optional[bool]): Whether to return a sanitized placeholder instead of the raw tool output. (default: :obj:False)
  • pause_event (Optional[asyncio.Event]): Event to signal pause of the agent’s operation. When clear, the agent will pause its execution. (default: :obj:None)
  • prune_tool_calls_from_memory (bool): Whether to clean tool call messages from memory after response generation to save token usage. When enabled, removes FUNCTION/TOOL role messages and ASSISTANT messages with tool_calls after each step. (default: :obj:False)

init

def __init__(
    self,
    system_message: Optional[Union[BaseMessage, str]] = None,
    model: Optional[Union[BaseModelBackend, ModelManager, Tuple[str, str], str, ModelType, Tuple[ModelPlatformType, ModelType], List[BaseModelBackend], List[str], List[ModelType], List[Tuple[str, str]], List[Tuple[ModelPlatformType, ModelType]]]] = None,
    memory: Optional[AgentMemory] = None,
    message_window_size: Optional[int] = None,
    token_limit: Optional[int] = None,
    output_language: Optional[str] = None,
    tools: Optional[List[Union[FunctionTool, Callable]]] = None,
    toolkits_to_register_agent: Optional[List[RegisteredAgentToolkit]] = None,
    external_tools: Optional[List[Union[FunctionTool, Callable, Dict[str, Any]]]] = None,
    response_terminators: Optional[List[ResponseTerminator]] = None,
    scheduling_strategy: str = 'round_robin',
    max_iteration: Optional[int] = None,
    agent_id: Optional[str] = None,
    stop_event: Optional[threading.Event] = None,
    tool_execution_timeout: Optional[float] = None,
    mask_tool_output: bool = False,
    pause_event: Optional[asyncio.Event] = None,
    prune_tool_calls_from_memory: bool = False
):

reset

def reset(self):
Resets the :obj:ChatAgent to its initial state.

_resolve_models

def _resolve_models(
    self,
    model: Optional[Union[BaseModelBackend, Tuple[str, str], str, ModelType, Tuple[ModelPlatformType, ModelType], List[BaseModelBackend], List[str], List[ModelType], List[Tuple[str, str]], List[Tuple[ModelPlatformType, ModelType]]]]
):
Resolves model specifications into model backend instances. This method handles various input formats for model specifications and returns the appropriate model backend(s). Parameters:
  • model: Model specification in various formats including single model, list of models, or model type specifications.
Returns: Union[BaseModelBackend, List[BaseModelBackend]]: Resolved model backend(s).

_resolve_model_list

def _resolve_model_list(self, model_list: list):
Resolves a list of model specifications into model backend instances. Parameters:
  • model_list (list): List of model specifications in various formats.
Returns: Union[BaseModelBackend, List[BaseModelBackend]]: Resolved model backend(s).

system_message

def system_message(self):
Returns the system message for the agent.

tool_dict

def tool_dict(self):
Returns a dictionary of internal tools.

output_language

def output_language(self):
Returns the output language for the agent.

output_language

def output_language(self, value: str):
Set the output language for the agent. Note that this will clear the message history.

memory

def memory(self):
Returns the agent memory.

memory

def memory(self, value: AgentMemory):
Set the agent memory. When setting a new memory, the system message is automatically re-added to ensure it’s not lost. Parameters:
  • value (AgentMemory): The new agent memory to use.

_get_full_tool_schemas

def _get_full_tool_schemas(self):
Returns a list of tool schemas of all tools, including internal and external tools.

_get_external_tool_names

def _get_external_tool_names(self):
Returns a set of external tool names.

add_tool

def add_tool(self, tool: Union[FunctionTool, Callable]):
Add a tool to the agent.

add_tools

def add_tools(self, tools: List[Union[FunctionTool, Callable]]):
Add a list of tools to the agent.

add_external_tool

def add_external_tool(self, tool: Union[FunctionTool, Callable, Dict[str, Any]]):

remove_tool

def remove_tool(self, tool_name: str):
Remove a tool from the agent by name. Parameters:
  • tool_name (str): The name of the tool to remove.
Returns: bool: Whether the tool was successfully removed.

remove_tools

def remove_tools(self, tool_names: List[str]):
Remove a list of tools from the agent by name.

remove_external_tool

def remove_external_tool(self, tool_name: str):
Remove an external tool from the agent by name. Parameters:
  • tool_name (str): The name of the tool to remove.
Returns: bool: Whether the tool was successfully removed.

update_memory

def update_memory(
    self,
    message: BaseMessage,
    role: OpenAIBackendRole,
    timestamp: Optional[float] = None
):
Updates the agent memory with a new message. If the single message exceeds the model’s context window, it will be automatically split into multiple smaller chunks before being written into memory. This prevents later failures in ScoreBasedContextCreator where an over-sized message cannot fit into the available token budget at all. This slicing logic handles both regular text messages (in the content field) and long tool call results (in the result field of a FunctionCallingMessage). Parameters:
  • message (BaseMessage): The new message to add to the stored messages.
  • role (OpenAIBackendRole): The backend role type.
  • timestamp (Optional[float], optional): Custom timestamp for the memory record. If None, the current time will be used. (default: :obj:None) (default: obj:None)

load_memory

def load_memory(self, memory: AgentMemory):
Load the provided memory into the agent. Parameters:
  • memory (AgentMemory): The memory to load into the agent.
Returns: None

load_memory_from_path

def load_memory_from_path(self, path: str):
Loads memory records from a JSON file filtered by this agent’s ID. Parameters:
  • path (str): The file path to a JSON memory file that uses JsonStorage.

save_memory

def save_memory(self, path: str):
Retrieves the current conversation data from memory and writes it into a JSON file using JsonStorage. Parameters:
  • path (str): Target file path to store JSON data.

clear_memory

def clear_memory(self):
Returns: None

_generate_system_message_for_output_language

def _generate_system_message_for_output_language(self):
Returns: BaseMessage: The new system message.

init_messages

def init_messages(self):
Initializes the stored messages list with the current system message.

record_message

def record_message(self, message: BaseMessage):
Records the externally provided message into the agent memory as if it were an answer of the :obj:ChatAgent from the backend. Currently, the choice of the critic is submitted with this method. Parameters:
  • message (BaseMessage): An external message to be recorded in the memory.

_try_format_message

def _try_format_message(self, message: BaseMessage, response_format: Type[BaseModel]):
Returns: bool: Whether the message is formatted successfully (or no format is needed).

_check_tools_strict_compatibility

def _check_tools_strict_compatibility(self):
Returns: bool: True if all tools are strict mode compatible, False otherwise.

_convert_response_format_to_prompt

def _convert_response_format_to_prompt(self, response_format: Type[BaseModel]):
Convert a Pydantic response format to a prompt instruction. Parameters:
  • response_format (Type[BaseModel]): The Pydantic model class.
Returns: str: A prompt instruction requesting the specific format.

_handle_response_format_with_non_strict_tools

def _handle_response_format_with_non_strict_tools(
    self,
    input_message: Union[BaseMessage, str],
    response_format: Optional[Type[BaseModel]] = None
):
Handle response format when tools are not strict mode compatible. Parameters:
  • input_message: The original input message.
  • response_format: The requested response format.
Returns: Tuple: (modified_message, modified_response_format, used_prompt_formatting)

_apply_prompt_based_parsing

def _apply_prompt_based_parsing(
    self,
    response: ModelResponse,
    original_response_format: Type[BaseModel]
):
Apply manual parsing when using prompt-based formatting. Parameters:
  • response: The model response to parse.
  • original_response_format: The original response format class.

_format_response_if_needed

def _format_response_if_needed(
    self,
    response: ModelResponse,
    response_format: Optional[Type[BaseModel]] = None
):
Format the response if needed. This function won’t format the response under the following cases:
  1. The response format is None (not provided)
  2. The response is empty

step

def step(
    self,
    input_message: Union[BaseMessage, str],
    response_format: Optional[Type[BaseModel]] = None
):
Executes a single step in the chat session, generating a response to the input message. Parameters:
  • input_message (Union[BaseMessage, str]): The input message for the agent. If provided as a BaseMessage, the role is adjusted to user to indicate an external message.
  • response_format (Optional[Type[BaseModel]], optional): A Pydantic model defining the expected structure of the response. Used to generate a structured response if provided. (default: :obj:None)
Returns: Union[ChatAgentResponse, StreamingChatAgentResponse]: If stream is False, returns a ChatAgentResponse. If stream is True, returns a StreamingChatAgentResponse that behaves like ChatAgentResponse but can also be iterated for streaming updates.

chat_history

def chat_history(self):

_create_token_usage_tracker

def _create_token_usage_tracker(self):
Returns: Dict[str, int]: A dictionary for tracking token usage.

_update_token_usage_tracker

def _update_token_usage_tracker(self, tracker: Dict[str, int], usage_dict: Dict[str, int]):
Updates a token usage tracker with values from a usage dictionary. Parameters:
  • tracker (Dict[str, int]): The token usage tracker to update.
  • usage_dict (Dict[str, int]): The usage dictionary with new values.

_convert_to_chatagent_response

def _convert_to_chatagent_response(
    self,
    response: ModelResponse,
    tool_call_records: List[ToolCallingRecord],
    num_tokens: int,
    external_tool_call_requests: Optional[List[ToolCallRequest]],
    step_api_prompt_tokens: int = 0,
    step_api_completion_tokens: int = 0,
    step_api_total_tokens: int = 0
):
Parse the final model response into the chat agent response.

_record_final_output

def _record_final_output(self, output_messages: List[BaseMessage]):
Log final messages or warnings about multiple responses.

_get_model_response

def _get_model_response(
    self,
    openai_messages: List[OpenAIMessage],
    num_tokens: int,
    current_iteration: int = 0,
    response_format: Optional[Type[BaseModel]] = None,
    tool_schemas: Optional[List[Dict[str, Any]]] = None,
    prev_num_openai_messages: int = 0
):
Internal function for agent step model response. Parameters:
  • openai_messages (List[OpenAIMessage]): The OpenAI messages to process.
  • num_tokens (int): The number of tokens in the context.
  • current_iteration (int): The current iteration of the step.
  • response_format (Optional[Type[BaseModel]]): The response format to use.
  • tool_schemas (Optional[List[Dict[str, Any]]]): The tool schemas to use.
  • prev_num_openai_messages (int): The number of openai messages logged in the previous iteration.
Returns: ModelResponse: The model response.

_sanitize_messages_for_logging

def _sanitize_messages_for_logging(self, messages, prev_num_openai_messages: int):
Sanitize OpenAI messages for logging by replacing base64 image data with a simple message and a link to view the image. Parameters:
  • messages (List[OpenAIMessage]): The OpenAI messages to sanitize.
  • prev_num_openai_messages (int): The number of openai messages logged in the previous iteration.
Returns: List[OpenAIMessage]: The sanitized OpenAI messages.

_step_get_info

def _step_get_info(
    self,
    output_messages: List[BaseMessage],
    finish_reasons: List[str],
    usage_dict: Dict[str, int],
    response_id: str,
    tool_calls: List[ToolCallingRecord],
    num_tokens: int,
    external_tool_call_requests: Optional[List[ToolCallRequest]] = None
):
Process the output of a chat step and gather information about the step. This method checks for termination conditions, updates the agent’s state, and collects information about the chat step, including tool calls and termination reasons. Parameters:
  • output_messages (List[BaseMessage]): The messages generated in this step.
  • finish_reasons (List[str]): The reasons for finishing the generation for each message.
  • usage_dict (Dict[str, int]): Dictionary containing token usage information.
  • response_id (str): The ID of the response from the model.
  • tool_calls (List[ToolCallingRecord]): Records of function calls made during this step.
  • num_tokens (int): The number of tokens used in this step.
  • external_tool_call_request (Optional[ToolCallRequest]): The request for external tool call.
Returns: Dict[str, Any]: A dictionary containing information about the chat step, including termination status, reasons, and tool call information. Note: This method iterates over all response terminators and checks if any of them signal termination. If a terminator signals termination, the agent’s state is updated accordingly, and the termination reason is recorded.

_handle_batch_response

def _handle_batch_response(self, response: ChatCompletion):
Process a batch response from the model and extract the necessary information. Parameters:
  • response (ChatCompletion): Model response.
Returns: _ModelResponse: parsed model response.

_step_terminate

def _step_terminate(
    self,
    num_tokens: int,
    tool_calls: List[ToolCallingRecord],
    termination_reason: str
):
Create a response when the agent execution is terminated. This method is called when the agent needs to terminate its execution due to various reasons such as token limit exceeded, or other termination conditions. It creates a response with empty messages but includes termination information in the info dictionary. Parameters:
  • num_tokens (int): Number of tokens in the messages.
  • tool_calls (List[ToolCallingRecord]): List of information objects of functions called in the current step.
  • termination_reason (str): String describing the reason for termination.
Returns: ChatAgentResponse: A response object with empty message list, terminated flag set to True, and an info dictionary containing termination details, token counts, and tool call information.

_execute_tool

def _execute_tool(self, tool_call_request: ToolCallRequest):
Execute the tool with arguments following the model’s response. Parameters:
  • tool_call_request (_ToolCallRequest): The tool call request.
Returns: FunctionCallingRecord: A struct for logging information about this function call.

_record_tool_calling

def _record_tool_calling(
    self,
    func_name: str,
    args: Dict[str, Any],
    result: Any,
    tool_call_id: str,
    mask_output: bool = False
):
Record the tool calling information in the memory, and return the tool calling record. Parameters:
  • func_name (str): The name of the tool function called.
  • args (Dict[str, Any]): The arguments passed to the tool.
  • result (Any): The result returned by the tool execution.
  • tool_call_id (str): A unique identifier for the tool call.
  • mask_output (bool, optional): Whether to return a sanitized placeholder instead of the raw tool output. (default: :obj:False)
Returns: ToolCallingRecord: A struct containing information about this tool call.

_stream

def _stream(
    self,
    input_message: Union[BaseMessage, str],
    response_format: Optional[Type[BaseModel]] = None
):
Executes a streaming step in the chat session, yielding intermediate responses as they are generated. Parameters:
  • input_message (Union[BaseMessage, str]): The input message for the agent.
  • response_format (Optional[Type[BaseModel]], optional): A Pydantic model defining the expected structure of the response.
  • Yields:
  • ChatAgentResponse: Intermediate responses containing partial content, tool calls, and other information as they become available.

_get_token_count

def _get_token_count(self, content: str):
Get token count for content with fallback.

_stream_response

def _stream_response(
    self,
    openai_messages: List[OpenAIMessage],
    num_tokens: int,
    response_format: Optional[Type[BaseModel]] = None
):
Internal method to handle streaming responses with tool calls.

_process_stream_chunks_with_accumulator

def _process_stream_chunks_with_accumulator(
    self,
    stream: Stream[ChatCompletionChunk],
    content_accumulator: StreamContentAccumulator,
    accumulated_tool_calls: Dict[str, Any],
    tool_call_records: List[ToolCallingRecord],
    step_token_usage: Dict[str, int],
    response_format: Optional[Type[BaseModel]] = None
):
Process streaming chunks with content accumulator.

_accumulate_tool_calls

def _accumulate_tool_calls(
    self,
    tool_call_deltas: List[Any],
    accumulated_tool_calls: Dict[str, Any]
):
Accumulate tool call chunks and return True when any tool call is complete. Parameters:
  • tool_call_deltas (List[Any]): List of tool call deltas.
  • accumulated_tool_calls (Dict[str, Any]): Dictionary of accumulated tool calls.
Returns: bool: True if any tool call is complete, False otherwise.

_execute_tools_sync_with_status_accumulator

def _execute_tools_sync_with_status_accumulator(
    self,
    accumulated_tool_calls: Dict[str, Any],
    tool_call_records: List[ToolCallingRecord]
):
Execute multiple tools synchronously with proper content accumulation, using threads+queue for non-blocking status streaming.

_execute_tool_from_stream_data

def _execute_tool_from_stream_data(self, tool_call_data: Dict[str, Any]):
Execute a tool from accumulated stream data.

_create_error_response

def _create_error_response(
    self,
    error_message: str,
    tool_call_records: List[ToolCallingRecord]
):
Create an error response for streaming.

_record_assistant_tool_calls_message

def _record_assistant_tool_calls_message(self, accumulated_tool_calls: Dict[str, Any], content: str = ''):
Record the assistant message that contains tool calls. This method creates and records an assistant message that includes the tool calls information, which is required by OpenAI’s API format.

_create_streaming_response_with_accumulator

def _create_streaming_response_with_accumulator(
    self,
    accumulator: StreamContentAccumulator,
    new_content: str,
    step_token_usage: Dict[str, int],
    response_id: str = '',
    tool_call_records: Optional[List[ToolCallingRecord]] = None
):
Create a streaming response using content accumulator.

get_usage_dict

def get_usage_dict(self, output_messages: List[BaseMessage], prompt_tokens: int):
Get usage dictionary when using the stream mode. Parameters:
  • output_messages (list): List of output messages.
  • prompt_tokens (int): Number of input prompt tokens.
Returns: dict: Usage dictionary.

add_model_scheduling_strategy

def add_model_scheduling_strategy(self, name: str, strategy_fn: Callable):
Add a scheduling strategy method provided by user to ModelManger. Parameters:
  • name (str): The name of the strategy.
  • strategy_fn (Callable): The scheduling strategy function.

clone

def clone(self, with_memory: bool = False):
Creates a new instance of :obj:ChatAgent with the same configuration as the current instance. Parameters:
  • with_memory (bool): Whether to copy the memory (conversation history) to the new agent. If True, the new agent will have the same conversation history. If False, the new agent will have a fresh memory with only the system message. (default: :obj:False)
Returns: ChatAgent: A new instance of :obj:ChatAgent with the same configuration.

_clone_tools

def _clone_tools(self):
Returns: Tuple containing:
  • List of cloned tools/functions
  • List of RegisteredAgentToolkit instances need registration

repr

def __repr__(self):
Returns: str: The string representation of the :obj:ChatAgent.

to_mcp

def to_mcp(
    self,
    name: str = 'CAMEL-ChatAgent',
    description: str = 'A helpful assistant using the CAMEL AI framework.',
    dependencies: Optional[List[str]] = None,
    host: str = 'localhost',
    port: int = 8000
):
Expose this ChatAgent as an MCP server. Parameters:
  • name (str): Name of the MCP server. (default: :obj:CAMEL-ChatAgent)
  • description (Optional[List[str]]): Description of the agent. If None, a generic description is used. (default: :obj:A helpful assistant using the CAMEL AI framework.)
  • dependencies (Optional[List[str]]): Additional dependencies for the MCP server. (default: :obj:None)
  • host (str): Host to bind to for HTTP transport. (default: :obj:localhost)
  • port (int): Port to bind to for HTTP transport. (default: :obj:8000)
Returns: FastMCP: An MCP server instance that can be run.