Documentation Index
Fetch the complete documentation index at: https://docs.camel-ai.org/llms.txt
Use this file to discover all available pages before exploring further.
_cleanup_temp_files
def _cleanup_temp_files():
StreamContentAccumulator
class StreamContentAccumulator:
Manages content accumulation across streaming responses to ensure
all responses contain complete cumulative content.
init
set_base_content
def set_base_content(self, content: str):
Set the base content (usually empty or pre-tool content).
add_streaming_content
def add_streaming_content(self, new_content: str):
Add new streaming content.
add_reasoning_content
def add_reasoning_content(self, new_reasoning: str):
Add new reasoning content.
def add_tool_status(self, status_message: str):
Add a tool status message.
get_full_content
def get_full_content(self):
Get the complete accumulated content.
get_full_reasoning_content
def get_full_reasoning_content(self):
Get the complete accumulated reasoning content.
get_content_with_new_status
def get_content_with_new_status(self, status_message: str):
Get content with a new status message appended.
reset_streaming_content
def reset_streaming_content(self):
Reset only the streaming content, keep base and tool status.
StreamingChatAgentResponse
class StreamingChatAgentResponse:
A wrapper that makes streaming responses compatible with
non-streaming code.
This class wraps a Generator[ChatAgentResponse, None, None] and provides
the same interface as ChatAgentResponse, so existing code doesn’t need to
change.
init
def __init__(self, generator: Generator[ChatAgentResponse, None, None]):
_ensure_latest_response
def _ensure_latest_response(self):
Ensure we have the latest response by consuming the generator.
msgs
Get messages from the latest response.
terminated
Get terminated status from the latest response.
info
Get info from the latest response.
msg
Get the single message if there’s exactly one message.
iter
Make this object iterable.
getattr
def __getattr__(self, name):
Forward any other attribute access to the latest response.
AsyncStreamingChatAgentResponse
class AsyncStreamingChatAgentResponse:
A wrapper that makes async streaming responses awaitable and
compatible with non-streaming code.
This class wraps an AsyncGenerator[ChatAgentResponse, None] and provides
both awaitable and async iterable interfaces.
init
def __init__(self, async_generator: AsyncGenerator[ChatAgentResponse, None]):
await
Make this object awaitable - returns the final response.
aiter
Make this object async iterable.
ChatAgent
class ChatAgent(BaseAgent):
Class for managing conversations of CAMEL Chat Agents.
Parameters:
- system_message (Union[BaseMessage, str], optional): The system message for the chat agent. (default: :obj:
None) model (Union[BaseModelBackend, Tuple[str, str], str, ModelType, Tuple[ModelPlatformType, ModelType], List[BaseModelBackend], List[str], List[ModelType], List[Tuple[str, str]], List[Tuple[ModelPlatformType, ModelType]]], optional): The model backend(s) to use. Can be a single instance, a specification (string, enum, tuple), or a list of instances or specifications to be managed by ModelManager. If a list of specifications (not BaseModelBackend instances) is provided, they will be instantiated using ModelFactory. (default: :obj:ModelPlatformType.DEFAULT with ModelType.DEFAULT)
- memory (AgentMemory, optional): The agent memory for managing chat messages. If
None, a :obj:ChatHistoryMemory will be used. (default: :obj:None)
- message_window_size (int, optional): The maximum number of previous messages to include in the context window. If
None, no windowing is performed. (default: :obj:None)
- summarize_threshold (int, optional): The percentage of the context window that triggers summarization. If
None, will trigger summarization when the context window is full. (default: :obj:None)
- token_limit (int, optional): The maximum number of tokens allowed for the context window. If
None, uses the model’s default token limit. This can be used to restrict the context size below the model’s maximum capacity. (default: :obj:None)
- output_language (str, optional): The language to be output by the agent. (default: :obj:
None)
- tools (Optional[List[Union[FunctionTool, Callable]]], optional): List of available :obj:
FunctionTool or :obj:Callable. (default: :obj:None) toolkits_to_register_agent (Optional[List[RegisteredAgentToolkit]], optional): List of toolkit instances that inherit from :obj:RegisteredAgentToolkit. The agent will register itself with these toolkits, allowing them to access the agent instance. Note: This does NOT add the toolkit’s tools to the agent. To use tools from these toolkits, pass them explicitly via the tools parameter. (default: :obj:None) external_tools (Optional[List[Union[FunctionTool, Callable, Dict[str, Any]]]], optional): List of external tools (:obj:FunctionTool or :obj:Callable or :obj:Dict[str, Any]) bind to one chat agent. When these tools are called, the agent will directly return the request instead of processing it. (default: :obj:None)
- response_terminators (List[ResponseTerminator], optional): List of :obj:
ResponseTerminator to check if task is complete. When set, the agent will keep prompting the model until a terminator signals completion. Note: You must define the termination signal (e.g., a keyword) in your system prompt so the model knows what to output. (default: :obj:None)
- scheduling_strategy (str): name of function that defines how to select the next model in ModelManager. (default: :str:
round_robin)
- max_iteration (Optional[int], optional): Maximum number of model calling iterations allowed per step. If
None (default), there’s no explicit limit. If 1, it performs a single model call. If N > 1, it allows up to N model calls. (default: :obj:None)
- agent_id (str, optional): The ID of the agent. If not provided, a random UUID will be generated. (default: :obj:
None)
- stop_event (Optional[threading.Event], optional): Event to signal termination of the agent’s operation. When set, the agent will terminate its execution. (default: :obj:
None)
- tool_execution_timeout (Optional[float], optional): Timeout for individual tool execution. If None, wait indefinitely.
- mask_tool_output (Optional[bool]): Whether to return a sanitized placeholder instead of the raw tool output. (default: :obj:
False)
- pause_event (Optional[Union[threading.Event, asyncio.Event]]): Event to signal pause of the agent’s operation. When clear, the agent will pause its execution. Use threading.Event for sync operations or asyncio.Event for async operations. (default: :obj:
None)
- prune_tool_calls_from_memory (bool): Whether to clean tool call messages from memory after response generation to save token usage. When enabled, removes FUNCTION/TOOL role messages and ASSISTANT messages with tool_calls after each step. (default: :obj:
False)
- enable_snapshot_clean (bool, optional): Whether to clean snapshot markers and references from historical tool outputs in memory. This removes verbose DOM markers (like [ref=…]) from older tool results while keeping the latest output intact for immediate use. (default: :obj:
False)
- retry_attempts (int, optional): Maximum number of retry attempts for rate limit errors. (default: :obj:
3)
- retry_delay (float, optional): Initial delay in seconds between retries. Uses exponential backoff. (default: :obj:
1.0)
- step_timeout (Optional[float], optional): Timeout in seconds for the entire step operation. If None, no timeout is applied. (default: :obj:
None)
- stream_accumulate (Optional[bool], optional): When True, partial streaming updates return accumulated content. When False, partial updates return only the incremental delta (recommended). If None, defaults to False with a deprecation warning for users who previously relied on the old default (True). (default: :obj:
None, which behaves as :obj:False)
- summary_window_ratio (float, optional): Maximum fraction of the total context window that can be occupied by summary information. Used to limit how much of the model’s context is reserved for summarization results. (default: :obj:
0.6)
init
def __init__(
self,
system_message: Optional[Union[BaseMessage, str]] = None,
model: Optional[Union[BaseModelBackend, ModelManager, Tuple[str, str], str, ModelType, Tuple[ModelPlatformType, ModelType], List[BaseModelBackend], List[str], List[ModelType], List[Tuple[str, str]], List[Tuple[ModelPlatformType, ModelType]]]] = None,
memory: Optional[AgentMemory] = None,
message_window_size: Optional[int] = None,
summarize_threshold: Optional[int] = 50,
token_limit: Optional[int] = None,
output_language: Optional[str] = None,
tools: Optional[List[Union[FunctionTool, Callable]]] = None,
toolkits_to_register_agent: Optional[List[RegisteredAgentToolkit]] = None,
external_tools: Optional[List[Union[FunctionTool, Callable, Dict[str, Any]]]] = None,
response_terminators: Optional[List[ResponseTerminator]] = None,
scheduling_strategy: str = 'round_robin',
max_iteration: Optional[int] = None,
agent_id: Optional[str] = None,
stop_event: Optional[threading.Event] = None,
tool_execution_timeout: Optional[float] = Constants.TIMEOUT_THRESHOLD,
mask_tool_output: bool = False,
pause_event: Optional[Union[threading.Event, asyncio.Event]] = None,
prune_tool_calls_from_memory: bool = False,
enable_snapshot_clean: bool = False,
retry_attempts: int = 3,
retry_delay: float = 1.0,
step_timeout: Optional[float] = Constants.TIMEOUT_THRESHOLD,
stream_accumulate: Optional[bool] = None,
summary_window_ratio: float = 0.6
):
reset
Resets the :obj:ChatAgent to its initial state.
_update_token_cache
def _update_token_cache(self, usage_dict: Dict[str, Any], message_count: int):
Update the token count cache from LLM response usage.
Parameters:
- usage_dict (Dict[str, Any]): Usage dictionary from LLM response.
- message_count (int): Number of messages sent to the LLM.
_resolve_models
def _resolve_models(
self,
model: Optional[Union[BaseModelBackend, Tuple[str, str], str, ModelType, Tuple[ModelPlatformType, ModelType], List[BaseModelBackend], List[str], List[ModelType], List[Tuple[str, str]], List[Tuple[ModelPlatformType, ModelType]]]]
):
Resolves model specifications into model backend instances.
This method handles various input formats for model specifications and
returns the appropriate model backend(s).
Parameters:
- model: Model specification in various formats including single model, list of models, or model type specifications.
Returns:
Union[BaseModelBackend, List[BaseModelBackend]]: Resolved model
backend(s).
_resolve_model_list
def _resolve_model_list(self, model_list: list):
Resolves a list of model specifications into model backend
instances.
Parameters:
- model_list (list): List of model specifications in various formats.
Returns:
Union[BaseModelBackend, List[BaseModelBackend]]: Resolved model
backend(s).
system_message
def system_message(self):
Returns the system message for the agent.
Returns a dictionary of internal tools.
token_limit
Returns the token limit for the agent’s context window.
output_language
def output_language(self):
Returns the output language for the agent.
output_language
def output_language(self, value: str):
Set the output language for the agent.
Note that this will clear the message history.
memory
Returns the agent memory.
memory
def memory(self, value: AgentMemory):
Set the agent memory.
When setting a new memory, the system message is automatically
added after existing system messages, while preserving existing
memory data.
Parameters:
- value (AgentMemory): The new agent memory to use.
set_context_utility
def set_context_utility(self, context_utility: Optional[ContextUtility]):
Set the context utility for the agent.
This allows external components (like SingleAgentWorker) to provide
a shared context utility instance for workflow management.
Parameters:
- context_utility (ContextUtility, optional): The context utility to use. If None, the agent will create its own when needed.
def _get_full_tool_schemas(self):
Returns a list of tool schemas of all tools, including internal
and external tools.
def _serialize_tool_args(args: Dict[str, Any]):
def _build_tool_signature(cls, func_name: str, args: Dict[str, Any]):
def _describe_tool_call(self, record: Optional[ToolCallingRecord]):
def _update_last_tool_call_state(self, record: Optional[ToolCallingRecord]):
Track the most recent tool call and its identifying signature.
_append_user_messages_section
def _append_user_messages_section(summary_content: str, user_messages: List[str]):
_reset_summary_state
def _reset_summary_state(self):
_get_context_with_summarization
def _get_context_with_summarization(self):
Get context and trigger summarization if needed.
_calculate_next_summary_threshold
def _calculate_next_summary_threshold(self):
Returns:
int: The token count threshold for next summarization.
_update_memory_with_summary
def _update_memory_with_summary(self, summary: str, include_summaries: bool = False):
Update memory with summary result.
This method handles memory clearing and restoration of summaries based
on whether it’s a progressive or full compression.
def _get_external_tool_names(self):
Returns a set of external tool names.
def add_tool(self, tool: Union[FunctionTool, Callable]):
Add a tool to the agent.
def add_tools(self, tools: List[Union[FunctionTool, Callable]]):
Add a list of tools to the agent.
def _serialize_tool_result(self, result: Any):
def _truncate_tool_result(self, func_name: str, result: Any):
Truncate tool result if it exceeds the maximum token limit.
Parameters:
- func_name (str): The name of the tool function called.
- result (Any): The result returned by the tool execution.
Returns:
Tuple[Any, bool]: A tuple containing:
- The (possibly truncated) result
- A boolean indicating whether truncation occurred
_clean_snapshot_line
def _clean_snapshot_line(self, line: str):
Clean a single snapshot line by removing prefixes and references.
This method handles snapshot lines in the format:
- [prefix] “quoted text” [attributes] [ref=…]: description
It preserves:
- Quoted text content (including brackets inside quotes)
- Description text after the colon
It removes:
- Line prefixes (e.g., ”- button”, ”- tooltip”, “generic:”)
- Attribute markers (e.g., [disabled], [ref=e47])
- Lines with only element types
- All indentation
Parameters:
- line: The original line content.
Returns:
The cleaned line content, or empty string if line should be
removed.
_clean_snapshot_content
def _clean_snapshot_content(self, content: str):
Clean snapshot content by removing prefixes, references, and
deduplicating lines.
This method identifies snapshot lines (containing element keywords or
references) and cleans them while preserving non-snapshot content.
It also handles JSON-formatted tool outputs with snapshot fields.
Parameters:
- content: The original snapshot content.
Returns:
The cleaned content with deduplicated lines.
_clean_text_snapshot
def _clean_text_snapshot(self, content: str):
Clean plain text snapshot content.
This method:
- Removes all indentation
- Deletes empty lines
- Deduplicates all lines
- Cleans snapshot-specific markers
Parameters:
- content: The original snapshot text.
Returns:
The cleaned content with deduplicated lines, no indentation,
and no empty lines.
def _register_tool_output_for_cache(
self,
func_name: str,
tool_call_id: str,
result_text: str,
records: List[MemoryRecord]
):
def _process_tool_output_cache(self):
_clean_snapshot_in_memory
def _clean_snapshot_in_memory(self, entry: _ToolOutputHistoryEntry):
def add_external_tool(self, tool: Union[FunctionTool, Callable, Dict[str, Any]]):
def remove_tool(self, tool_name: str):
Remove a tool from the agent by name.
Parameters:
- tool_name (str): The name of the tool to remove.
Returns:
bool: Whether the tool was successfully removed.
def remove_tools(self, tool_names: List[str]):
Remove a list of tools from the agent by name.
def remove_external_tool(self, tool_name: str):
Remove an external tool from the agent by name.
Parameters:
- tool_name (str): The name of the tool to remove.
Returns:
bool: Whether the tool was successfully removed.
update_memory
def update_memory(
self,
message: BaseMessage,
role: OpenAIBackendRole,
timestamp: Optional[float] = None,
return_records: bool = False
):
Updates the agent memory with a new message.
Parameters:
- message (BaseMessage): The new message to add to the stored messages.
- role (OpenAIBackendRole): The backend role type.
- timestamp (Optional[float], optional): Custom timestamp for the memory record. If
None, the current time will be used. (default: :obj:None)
- return_records (bool, optional): When
__INLINE_CODE_0____INLINE_CODE_1__False)
Returns:
Optional[List[MemoryRecord]]: The records that were written when
__INLINE_CODE_0____INLINE_CODE_1____INLINE_CODE_2____INLINE_CODE_3____INLINE_CODE_4__.
load_memory
def load_memory(self, memory: AgentMemory):
Load the provided memory into the agent.
Parameters:
- memory (AgentMemory): The memory to load into the agent.
Returns:
None
load_memory_from_path
def load_memory_from_path(self, path: str):
Loads memory records from a JSON file filtered by this agent’s ID.
Parameters:
- path (str): The file path to a JSON memory file that uses JsonStorage.
save_memory
def save_memory(self, path: str):
Retrieves the current conversation data from memory and writes it
into a JSON file using JsonStorage.
Parameters:
- path (str): Target file path to store JSON data.
summarize
def summarize(
self,
filename: Optional[str] = None,
summary_prompt: Optional[str] = None,
response_format: Optional[Type[BaseModel]] = None,
working_directory: Optional[Union[str, Path]] = None,
include_summaries: bool = False,
add_user_messages: bool = True
):
Summarize the agent’s current conversation context and persist it
to a markdown file.
.. deprecated:: 0.2.80
Use :meth:asummarize for async/await support and better
performance in parallel summarization workflows.
Parameters:
- filename (Optional[str]): The base filename (without extension) to use for the markdown file. Defaults to a timestamped name when not provided.
- summary_prompt (Optional[str]): Custom prompt for the summarizer. When omitted, a default prompt highlighting key decisions, action items, and open questions is used.
- response_format (Optional[Type[BaseModel]]): A Pydantic model defining the expected structure of the response. If provided, the summary will be generated as structured output and included in the result.
- include_summaries (bool): Whether to include previously generated summaries in the content to be summarized. If False (default), only non-summary messages will be summarized. If True, all messages including previous summaries will be summarized (full compression). (default: :obj:
False)
- working_directory (Optional[str|Path]): Optional directory to save the markdown summary file. If provided, overrides the default directory used by ContextUtility.
- add_user_messages (bool): Whether add user messages to summary. (default: :obj:
True)
Returns:
Dict[str, Any]: A dictionary containing the summary text, file
path, status message, and optionally structured_summary if
response_format was provided.
See Also:
:meth:asummarize: Async version for non-blocking LLM calls.
_build_conversation_text_from_messages
def _build_conversation_text_from_messages(self, messages: List[Any], include_summaries: bool = False):
Build conversation text from messages for summarization.
This is a shared helper method that converts messages to a formatted
conversation text string, handling tool calls, tool results, and
regular messages.
Parameters:
- messages (List[Any]): List of messages to convert.
- include_summaries (bool): Whether to include messages starting with [CONTEXT_SUMMARY]. (default: :obj:
False)
Returns:
tuple[str, List[str]]: A tuple containing:
- Formatted conversation text
- List of user messages extracted from the conversation
clear_memory
def clear_memory(self, reset_summary_state: bool = True):
Clear the agent’s memory and reset to initial state.
Parameters:
- reset_summary_state (bool): Whether to reset the summary token count. Set to False when preserving summary state during summarization. Defaults to True for full memory clearing.
_generate_system_message_for_output_language
def _generate_system_message_for_output_language(self):
Returns:
BaseMessage: The new system message.
init_messages
Initializes the stored messages list with the current system
message.
update_system_message
def update_system_message(
self,
system_message: Union[BaseMessage, str],
reset_memory: bool = True
):
Update the system message.
It will reset conversation with new system message.
Parameters:
- system_message (Union[BaseMessage, str]): The new system message. Can be either a BaseMessage object or a string. If a string is provided, it will be converted into a BaseMessage object.
- reset_memory (bool): Whether to reinitialize conversation messages after updating the system message. Defaults to True.
append_to_system_message
def append_to_system_message(self, content: str, reset_memory: bool = True):
Append additional context to existing system message.
Parameters:
- content (str): The additional system message.
- reset_memory (bool): Whether to reinitialize conversation messages after appending additional context. Defaults to True.
reset_to_original_system_message
def reset_to_original_system_message(self):
Reset system message to original, removing any appended context.
This method reverts the agent’s system message back to its original
state, removing any workflow context or other modifications that may
have been appended. Useful for resetting agent state in multi-turn
scenarios.
record_message
def record_message(self, message: BaseMessage):
Records the externally provided message into the agent memory as if
it were an answer of the :obj:ChatAgent from the backend. Currently,
the choice of the critic is submitted with this method.
Parameters:
- message (BaseMessage): An external message to be recorded in the memory.
def _try_format_message(self, message: BaseMessage, response_format: Type[BaseModel]):
Returns:
bool: Whether the message is formatted successfully (or no format
is needed).
def _check_tools_strict_compatibility(self):
Returns:
bool: True if all tools are strict mode compatible,
False otherwise.
def _convert_response_format_to_prompt(self, response_format: Type[BaseModel]):
Convert a Pydantic response format to a prompt instruction.
Parameters:
- response_format (Type[BaseModel]): The Pydantic model class.
Returns:
str: A prompt instruction requesting the specific format.
def _handle_response_format_with_non_strict_tools(
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None
):
Handle response format when tools are not strict mode compatible.
Parameters:
- input_message: The original input message.
- response_format: The requested response format.
Returns:
Tuple: (modified_message, modified_response_format,
used_prompt_formatting)
def _is_called_from_registered_toolkit(self):
Returns:
bool: True if called from a RegisteredAgentToolkit, False otherwise
_apply_prompt_based_parsing
def _apply_prompt_based_parsing(
self,
response: ModelResponse,
original_response_format: Type[BaseModel]
):
Apply manual parsing when using prompt-based formatting.
Parameters:
- response: The model response to parse.
- original_response_format: The original response format class.
def _format_response_if_needed(
self,
response: ModelResponse,
response_format: Optional[Type[BaseModel]] = None
):
Format the response if needed.
This function won’t format the response under the following cases:
- The response format is None (not provided)
- The response is empty
step
def step(
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None
):
Executes a single step in the chat session, generating a response
to the input message.
Parameters:
- input_message (Union[BaseMessage, str]): The input message for the agent. If provided as a BaseMessage, the
role is adjusted to user to indicate an external message.
- response_format (Optional[Type[BaseModel]], optional): A Pydantic model defining the expected structure of the response. Used to generate a structured response if provided. (default: :obj:
None)
Returns:
Union[ChatAgentResponse, StreamingChatAgentResponse]: If stream is
False, returns a ChatAgentResponse. If stream is True, returns
a StreamingChatAgentResponse that behaves like
ChatAgentResponse but can also be iterated for
streaming updates.
_step_impl
def _step_impl(
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None
):
Implementation of non-streaming step logic.
chat_history
_create_token_usage_tracker
def _create_token_usage_tracker(self):
Returns:
Dict[str, int]: A dictionary for tracking token usage.
_update_token_usage_tracker
def _update_token_usage_tracker(self, tracker: Dict[str, int], usage_dict: Dict[str, int]):
Updates a token usage tracker with values from a usage dictionary.
Parameters:
- tracker (Dict[str, int]): The token usage tracker to update.
- usage_dict (Dict[str, int]): The usage dictionary with new values.
_convert_to_chatagent_response
def _convert_to_chatagent_response(
self,
response: ModelResponse,
tool_call_records: List[ToolCallingRecord],
num_tokens: int,
external_tool_call_requests: Optional[List[ToolCallRequest]],
step_api_prompt_tokens: int = 0,
step_api_completion_tokens: int = 0,
step_api_total_tokens: int = 0
):
Parse the final model response into the chat agent response.
_record_final_output
def _record_final_output(self, output_messages: List[BaseMessage]):
Log final messages or warnings about multiple responses.
_get_model_response
def _get_model_response(
self,
openai_messages: List[OpenAIMessage],
current_iteration: int = 0,
response_format: Optional[Type[BaseModel]] = None,
tool_schemas: Optional[List[Dict[str, Any]]] = None,
prev_num_openai_messages: int = 0
):
Internal function for agent step model response.
_sanitize_messages_for_logging
def _sanitize_messages_for_logging(self, messages, prev_num_openai_messages: int):
Sanitize OpenAI messages for logging by replacing base64 image
data with a simple message and a link to view the image.
Parameters:
- messages (List[OpenAIMessage]): The OpenAI messages to sanitize.
- prev_num_openai_messages (int): The number of openai messages logged in the previous iteration.
Returns:
List[OpenAIMessage]: The sanitized OpenAI messages.
_step_get_info
def _step_get_info(
self,
output_messages: List[BaseMessage],
finish_reasons: List[str],
usage_dict: Dict[str, int],
response_id: str,
tool_calls: List[ToolCallingRecord],
num_tokens: int,
external_tool_call_requests: Optional[List[ToolCallRequest]] = None
):
Process the output of a chat step and gather information about the
step.
This method checks for termination conditions, updates the agent’s
state, and collects information about the chat step, including tool
calls and termination reasons.
Parameters:
- output_messages (List[BaseMessage]): The messages generated in this step.
- finish_reasons (List[str]): The reasons for finishing the generation for each message.
- usage_dict (Dict[str, int]): Dictionary containing token usage information.
- response_id (str): The ID of the response from the model.
- tool_calls (List[ToolCallingRecord]): Records of function calls made during this step.
- num_tokens (int): The number of tokens used in this step.
- external_tool_call_request (Optional[ToolCallRequest]): The request for external tool call.
Returns:
Dict[str, Any]: A dictionary containing information about the chat
step, including termination status, reasons, and tool call
information.
Note:
This method iterates over all response terminators and checks if
any of them signal termination. If a terminator signals
termination, the agent’s state is updated accordingly, and the
termination reason is recorded.
_handle_batch_response
def _handle_batch_response(self, response: ChatCompletion):
Process a batch response from the model and extract the necessary
information.
Parameters:
- response (ChatCompletion): Model response.
Returns:
_ModelResponse: parsed model response.
_step_terminate
def _step_terminate(
self,
num_tokens: int,
tool_calls: List[ToolCallingRecord],
termination_reason: str
):
Create a response when the agent execution is terminated.
This method is called when the agent needs to terminate its execution
due to various reasons such as token limit exceeded, or other
termination conditions. It creates a response with empty messages but
includes termination information in the info dictionary.
Parameters:
- num_tokens (int): Number of tokens in the messages.
- tool_calls (List[ToolCallingRecord]): List of information objects of functions called in the current step.
- termination_reason (str): String describing the reason for termination.
Returns:
ChatAgentResponse: A response object with empty message list,
terminated flag set to True, and an info dictionary containing
termination details, token counts, and tool call information.
def _execute_tool(self, tool_call_request: ToolCallRequest):
Execute the tool with arguments following the model’s response.
Parameters:
- tool_call_request (_ToolCallRequest): The tool call request.
Returns:
FunctionCallingRecord: A struct for logging information about this
function call.
def _record_tool_calling(
self,
func_name: str,
args: Dict[str, Any],
result: Any,
tool_call_id: str,
mask_output: bool = False,
extra_content: Optional[Dict[str, Any]] = None
):
Record the tool result in the memory.
Parameters:
- func_name (str): The name of the tool function called.
- args (Dict[str, Any]): The arguments passed to the tool.
- result (Any): The result returned by the tool execution.
- tool_call_id (str): A unique identifier for the tool call.
- mask_output (bool, optional): Whether to return a sanitized placeholder instead of the raw tool output. (default: :obj:
False)
- extra_content (Optional[Dict[str, Any]], optional): Additional content associated with the tool call. (default: :obj:
None)
Returns:
ToolCallingRecord: A struct containing information about
this tool call.
_stream
def _stream(
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None
):
Executes a streaming step in the chat session, yielding
intermediate responses as they are generated.
Parameters:
- input_message (Union[BaseMessage, str]): The input message for the agent.
- response_format (Optional[Type[BaseModel]], optional): A Pydantic model defining the expected structure of the response.
- Yields:
- ChatAgentResponse: Intermediate responses containing partial content, tool calls, and other information as they become available.
_get_token_count
def _get_token_count(self, content: str):
Get token count for content with fallback.
_warn_stream_accumulate_deprecation
def _warn_stream_accumulate_deprecation(self):
Issue deprecation warning for stream_accumulate default change.
Only warns once per agent instance, and only if the user didn’t
explicitly set stream_accumulate.
_stream_response
def _stream_response(
self,
openai_messages: List[OpenAIMessage],
num_tokens: int,
response_format: Optional[Type[BaseModel]] = None
):
Internal method to handle streaming responses with tool calls.
_process_stream_chunks_with_accumulator
def _process_stream_chunks_with_accumulator(
self,
stream: Stream[ChatCompletionChunk],
content_accumulator: StreamContentAccumulator,
accumulated_tool_calls: Dict[str, Any],
tool_call_records: List[ToolCallingRecord],
step_token_usage: Dict[str, int],
response_format: Optional[Type[BaseModel]] = None
):
Process streaming chunks with content accumulator.
def _accumulate_tool_calls(
self,
tool_call_deltas: List[Any],
accumulated_tool_calls: Dict[str, Any]
):
Accumulate tool call chunks and return True when
any tool call is complete.
Parameters:
- tool_call_deltas (List[Any]): List of tool call deltas.
- accumulated_tool_calls (Dict[str, Any]): Dictionary of accumulated tool calls.
Returns:
bool: True if any tool call is complete, False otherwise.
def _execute_tools_sync_with_status_accumulator(
self,
accumulated_tool_calls: Dict[str, Any],
tool_call_records: List[ToolCallingRecord]
):
Execute multiple tools synchronously with proper content
accumulation, using ThreadPoolExecutor for better timeout handling.
def _execute_tool_from_stream_data(self, tool_call_data: Dict[str, Any]):
Execute a tool from accumulated stream data.
Note:
calling this method (via _record_assistant_tool_calls_message).
This method only records the tool result message.
_create_error_response
def _create_error_response(
self,
error_message: str,
tool_call_records: List[ToolCallingRecord]
):
Create an error response for streaming.
def _record_assistant_tool_calls_message(self, accumulated_tool_calls: Dict[str, Any], content: str = ''):
Record the assistant message that contains tool calls.
This method creates and records an assistant message that includes
the tool calls information, which is required by OpenAI’s API format.
def _record_assistant_tool_calls_from_requests(
self,
tool_call_requests: List['ToolCallRequest'],
content: str = ''
):
Record assistant message with tool calls from requests.
This method creates and records an assistant message that includes
all the tool calls from a list of ToolCallRequest objects.
Used for non-streaming tool execution to ensure proper message
sequence.
Parameters:
- tool_call_requests: List of tool call requests from model response.
- content: Optional content to include in the assistant message.
_create_streaming_response_with_accumulator
def _create_streaming_response_with_accumulator(
self,
accumulator: StreamContentAccumulator,
new_content: str,
step_token_usage: Dict[str, int],
response_id: str = '',
tool_call_records: Optional[List[ToolCallingRecord]] = None,
reasoning_delta: Optional[str] = None
):
Create a streaming response using content accumulator.
get_usage_dict
def get_usage_dict(self, output_messages: List[BaseMessage], prompt_tokens: int):
Get usage dictionary when using the stream mode.
Parameters:
- output_messages (list): List of output messages.
- prompt_tokens (int): Number of input prompt tokens.
Returns:
dict: Usage dictionary.
add_model_scheduling_strategy
def add_model_scheduling_strategy(self, name: str, strategy_fn: Callable):
Add a scheduling strategy method provided by user to ModelManger.
Parameters:
- name (str): The name of the strategy.
- strategy_fn (Callable): The scheduling strategy function.
clone
def clone(self, with_memory: bool = False):
Creates a new instance of :obj:ChatAgent with the same
configuration as the current instance.
Parameters:
- with_memory (bool): Whether to copy the memory (conversation history) to the new agent. If True, the new agent will have the same conversation history. If False, the new agent will have a fresh memory with only the system message. (default: :obj:
False)
Returns:
ChatAgent: A new instance of :obj:ChatAgent with the same
configuration.
Returns:
Tuple containing:
- List of cloned tools/functions
- List of RegisteredAgentToolkit instances need registration
repr
Returns:
str: The string representation of the :obj:ChatAgent.
to_mcp
def to_mcp(
self,
name: str = 'CAMEL-ChatAgent',
description: str = 'A helpful assistant using the CAMEL AI framework.',
dependencies: Optional[List[str]] = None,
host: str = 'localhost',
port: int = 8000
):
Expose this ChatAgent as an MCP server.
Parameters:
- name (str): Name of the MCP server. (default: :obj:
CAMEL-ChatAgent)
- description (Optional[List[str]]): Description of the agent. If None, a generic description is used. (default: :obj:
A helpful assistant using the CAMEL AI framework.)
- dependencies (Optional[List[str]]): Additional dependencies for the MCP server. (default: :obj:
None)
- host (str): Host to bind to for HTTP transport. (default: :obj:
localhost)
- port (int): Port to bind to for HTTP transport. (default: :obj:
8000)
Returns:
FastMCP: An MCP server instance that can be run.