# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
from __future__ import annotations
import json
import logging
import textwrap
from collections import defaultdict
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Set,
Type,
Union,
)
from openai import (
AsyncStream,
Stream,
)
from pydantic import BaseModel, ValidationError
from camel.agents._types import ModelResponse, ToolCallRequest
from camel.agents._utils import (
convert_to_function_tool,
convert_to_schema,
get_info_dict,
handle_logprobs,
safe_model_dump,
)
from camel.agents.base import BaseAgent
from camel.memories import (
AgentMemory,
ChatHistoryMemory,
MemoryRecord,
ScoreBasedContextCreator,
)
from camel.messages import BaseMessage, FunctionCallingMessage, OpenAIMessage
from camel.models import (
BaseModelBackend,
ModelFactory,
ModelManager,
ModelProcessingError,
)
from camel.prompts import TextPrompt
from camel.responses import ChatAgentResponse
from camel.toolkits import FunctionTool
from camel.types import (
ChatCompletion,
ChatCompletionChunk,
ModelPlatformType,
ModelType,
OpenAIBackendRole,
RoleType,
)
from camel.types.agents import ToolCallingRecord
from camel.utils import get_model_encoding
if TYPE_CHECKING:
from camel.terminators import ResponseTerminator
logger = logging.getLogger(__name__)
# AgentOps decorator setting
try:
import os
if os.getenv("AGENTOPS_API_KEY") is not None:
from agentops import track_agent
else:
raise ImportError
except (ImportError, AttributeError):
from camel.utils import track_agent
SIMPLE_FORMAT_PROMPT = TextPrompt(
textwrap.dedent(
"""\
Please format the following content:
{content}
"""
)
)
[docs]
@track_agent(name="ChatAgent")
class ChatAgent(BaseAgent):
r"""Class for managing conversations of CAMEL Chat Agents.
Args:
system_message (Union[BaseMessage, str], optional): The system message
for the chat agent.
model (BaseModelBackend, optional): The model backend to use for
generating responses. (default: :obj:`ModelPlatformType.DEFAULT`
with `ModelType.DEFAULT`)
memory (AgentMemory, optional): The agent memory for managing chat
messages. If `None`, a :obj:`ChatHistoryMemory` will be used.
(default: :obj:`None`)
message_window_size (int, optional): The maximum number of previous
messages to include in the context window. If `None`, no windowing
is performed. (default: :obj:`None`)
token_limit (int, optional): The maximum number of tokens in a context.
The context will be automatically pruned to fulfill the limitation.
If `None`, it will be set according to the backend model.
(default: :obj:`None`)
output_language (str, optional): The language to be output by the
agent. (default: :obj:`None`)
tools (Optional[List[Union[FunctionTool, Callable]]], optional): List
of available :obj:`FunctionTool` or :obj:`Callable`. (default:
:obj:`None`)
external_tools (Optional[List[Union[FunctionTool, Callable,
Dict[str, Any]]]], optional): List of external tools
(:obj:`FunctionTool` or :obj:`Callable` or :obj:`Dict[str, Any]`)
bind to one chat agent. When these tools are called, the agent will
directly return the request instead of processing it.
(default: :obj:`None`)
response_terminators (List[ResponseTerminator], optional): List of
:obj:`ResponseTerminator` bind to one chat agent.
(default: :obj:`None`)
scheduling_strategy (str): name of function that defines how to select
the next model in ModelManager. (default: :str:`round_robin`)
single_iteration (bool): Whether to let the agent perform only one
model calling at each step. (default: :obj:`False`)
"""
def __init__(
self,
system_message: Optional[Union[BaseMessage, str]] = None,
model: Optional[
Union[BaseModelBackend, List[BaseModelBackend]]
] = None,
memory: Optional[AgentMemory] = None,
message_window_size: Optional[int] = None,
token_limit: Optional[int] = None,
output_language: Optional[str] = None,
tools: Optional[List[Union[FunctionTool, Callable]]] = None,
external_tools: Optional[
List[Union[FunctionTool, Callable, Dict[str, Any]]]
] = None,
response_terminators: Optional[List[ResponseTerminator]] = None,
scheduling_strategy: str = "round_robin",
single_iteration: bool = False,
) -> None:
# Set up model backend
self.model_backend = ModelManager(
(
model
if model is not None
else ModelFactory.create(
model_platform=ModelPlatformType.DEFAULT,
model_type=ModelType.DEFAULT,
)
),
scheduling_strategy=scheduling_strategy,
)
self.model_type = self.model_backend.model_type
# Set up memory
context_creator = ScoreBasedContextCreator(
self.model_backend.token_counter,
token_limit or self.model_backend.token_limit,
)
self.memory: AgentMemory = memory or ChatHistoryMemory(
context_creator, window_size=message_window_size
)
# Set up system message and initialize messages
self._original_system_message = (
BaseMessage.make_assistant_message(
role_name="Assistant", content=system_message
)
if isinstance(system_message, str)
else system_message
)
self._output_language = output_language
self._system_message = (
self._generate_system_message_for_output_language()
)
self.init_messages()
# Set up role name and role type
self.role_name: str = (
getattr(self.system_message, "role_name", None) or "assistant"
)
self.role_type: RoleType = (
getattr(self.system_message, "role_type", None)
or RoleType.ASSISTANT
)
# Set up tools
self._internal_tools = {
tool.get_function_name(): tool
for tool in [
convert_to_function_tool(tool) for tool in (tools or [])
]
}
self._external_tool_schemas = {
tool_schema["function"]["name"]: tool_schema
for tool_schema in [
convert_to_schema(tool) for tool in (external_tools or [])
]
}
# Set up other properties
self.terminated = False
self.response_terminators = response_terminators or []
self.single_iteration = single_iteration
[docs]
def reset(self):
r"""Resets the :obj:`ChatAgent` to its initial state."""
self.terminated = False
self.init_messages()
for terminator in self.response_terminators:
terminator.reset()
@property
def system_message(self) -> Optional[BaseMessage]:
r"""Returns the system message for the agent."""
return self._system_message
@property
def tool_dict(self) -> Dict[str, FunctionTool]:
r"""Returns a dictionary of internal tools."""
return self._internal_tools
@property
def output_language(self) -> Optional[str]:
r"""Returns the output language for the agent."""
return self._output_language
@output_language.setter
def output_language(self, value: str) -> None:
r"""Set the output language for the agent.
Note that this will clear the message history.
"""
self._output_language = value
self._system_message = (
self._generate_system_message_for_output_language()
)
self.init_messages()
def _get_full_tool_schemas(self) -> List[Dict[str, Any]]:
r"""Returns a list of tool schemas of all tools, including internal
and external tools.
"""
return list(self._external_tool_schemas.values()) + [
func_tool.get_openai_tool_schema()
for func_tool in self._internal_tools.values()
]
def _get_external_tool_names(self) -> Set[str]:
r"""Returns a set of external tool names."""
return set(self._external_tool_schemas.keys())
[docs]
def update_memory(
self, message: BaseMessage, role: OpenAIBackendRole
) -> None:
r"""Updates the agent memory with a new message.
Args:
message (BaseMessage): The new message to add to the stored
messages.
role (OpenAIBackendRole): The backend role type.
"""
self.memory.write_record(
MemoryRecord(message=message, role_at_backend=role)
)
def _generate_system_message_for_output_language(
self,
) -> Optional[BaseMessage]:
r"""Generate a new system message with the output language prompt.
The output language determines the language in which the output text
should be generated.
Returns:
BaseMessage: The new system message.
"""
if not self._output_language:
return self._original_system_message
language_prompt = (
"\nRegardless of the input language, "
f"you must output text in {self._output_language}."
)
if self._original_system_message is not None:
content = self._original_system_message.content + language_prompt
return self._original_system_message.create_new_instance(content)
else:
return BaseMessage.make_assistant_message(
role_name="Assistant",
content=language_prompt,
)
[docs]
def init_messages(self) -> None:
r"""Initializes the stored messages list with the current system
message.
"""
self.memory.clear()
if self.system_message is not None:
self.update_memory(self.system_message, OpenAIBackendRole.SYSTEM)
[docs]
def record_message(self, message: BaseMessage) -> None:
r"""Records the externally provided message into the agent memory as if
it were an answer of the :obj:`ChatAgent` from the backend. Currently,
the choice of the critic is submitted with this method.
Args:
message (BaseMessage): An external message to be recorded in the
memory.
"""
self.update_memory(message, OpenAIBackendRole.ASSISTANT)
def _try_format_message(
self, message: BaseMessage, response_format: Type[BaseModel]
) -> bool:
r"""Try to format the message if needed.
Returns:
bool: Whether the message is formatted successfully (or no format
is needed).
"""
if message.parsed:
return True
try:
message.parsed = response_format.model_validate_json(
message.content
)
return True
except ValidationError:
return False
def _format_response_if_needed(
self,
response: ModelResponse,
response_format: Optional[Type[BaseModel]] = None,
) -> None:
r"""Format the response if needed.
This function won't format the response under the following cases:
1. The response format is None (not provided)
2. The response is empty
"""
if response_format is None:
return
for message in response.output_messages:
if self._try_format_message(message, response_format):
continue
prompt = SIMPLE_FORMAT_PROMPT.format(content=message.content)
openai_message: OpenAIMessage = {"role": "user", "content": prompt}
# Explicitly set the tools to empty list to avoid calling tools
response = self._get_model_response(
[openai_message], 0, response_format, []
)
message.content = response.output_messages[0].content
if not self._try_format_message(message, response_format):
logger.warning(f"Failed to parse response: {message.content}")
async def _aformat_response_if_needed(
self,
response: ModelResponse,
response_format: Optional[Type[BaseModel]] = None,
) -> None:
r"""Format the response if needed."""
if response_format is None:
return
for message in response.output_messages:
self._try_format_message(message, response_format)
if message.parsed:
continue
prompt = SIMPLE_FORMAT_PROMPT.format(content=message.content)
openai_message: OpenAIMessage = {"role": "user", "content": prompt}
response = await self._aget_model_response(
[openai_message], 0, response_format, []
)
message.content = response.output_messages[0].content
self._try_format_message(message, response_format)
[docs]
def step(
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
) -> ChatAgentResponse:
r"""Executes a single step in the chat session, generating a response
to the input message.
Args:
input_message (Union[BaseMessage, str]): The input message for the
agent. If provided as a BaseMessage, the `role` is adjusted to
`user` to indicate an external message.
response_format (Optional[Type[BaseModel]], optional): A Pydantic
model defining the expected structure of the response. Used to
generate a structured response if provided. (default:
:obj:`None`)
Returns:
ChatAgentResponse: Contains output messages, a termination status
flag, and session information.
"""
# Convert input message to BaseMessage if necessary
if isinstance(input_message, str):
input_message = BaseMessage.make_user_message(
role_name="User", content=input_message
)
# Add user input to memory
self.update_memory(input_message, OpenAIBackendRole.USER)
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_request: Optional[ToolCallRequest] = None
while True:
try:
openai_messages, num_tokens = self.memory.get_context()
except RuntimeError as e:
return self._step_token_exceed(
e.args[1], tool_call_records, "max_tokens_exceeded"
)
# Get response from model backend
response = self._get_model_response(
openai_messages,
num_tokens,
response_format,
self._get_full_tool_schemas(),
)
if self.single_iteration:
break
if tool_call_request := response.tool_call_request:
if tool_call_request.tool_name in self._external_tool_schemas:
external_tool_call_request = tool_call_request
break
tool_call_records.append(self._execute_tool(tool_call_request))
continue
break
self._format_response_if_needed(response, response_format)
self._record_final_output(response.output_messages)
return self._convert_to_chatagent_response(
response, tool_call_records, num_tokens, external_tool_call_request
)
@property
def chat_history(self) -> List[OpenAIMessage]:
openai_messages, _ = self.memory.get_context()
return openai_messages
[docs]
async def astep(
self,
input_message: Union[BaseMessage, str],
response_format: Optional[Type[BaseModel]] = None,
) -> ChatAgentResponse:
r"""Performs a single step in the chat session by generating a response
to the input message. This agent step can call async function calls.
Args:
input_message (Union[BaseMessage, str]): The input message to the
agent. For BaseMessage input, its `role` field that specifies
the role at backend may be either `user` or `assistant` but it
will be set to `user` anyway since for the self agent any
incoming message is external. For str input, the `role_name`
would be `User`.
response_format (Optional[Type[BaseModel]], optional): A pydantic
model class that includes value types and field descriptions
used to generate a structured response by LLM. This schema
helps in defining the expected output format. (default:
:obj:`None`)
Returns:
ChatAgentResponse: A struct containing the output messages,
a boolean indicating whether the chat session has terminated,
and information about the chat session.
"""
if isinstance(input_message, str):
input_message = BaseMessage.make_user_message(
role_name="User", content=input_message
)
self.update_memory(input_message, OpenAIBackendRole.USER)
tool_call_records: List[ToolCallingRecord] = []
external_tool_call_request: Optional[ToolCallRequest] = None
while True:
try:
openai_messages, num_tokens = self.memory.get_context()
except RuntimeError as e:
return self._step_token_exceed(
e.args[1], tool_call_records, "max_tokens_exceeded"
)
response = await self._aget_model_response(
openai_messages,
num_tokens,
response_format,
self._get_full_tool_schemas(),
)
if self.single_iteration:
break
if tool_call_request := response.tool_call_request:
if tool_call_request.tool_name in self._external_tool_schemas:
external_tool_call_request = tool_call_request
break
tool_call_record = await self._aexecute_tool(tool_call_request)
tool_call_records.append(tool_call_record)
continue
break
await self._aformat_response_if_needed(response, response_format)
self._record_final_output(response.output_messages)
return self._convert_to_chatagent_response(
response, tool_call_records, num_tokens, external_tool_call_request
)
def _convert_to_chatagent_response(
self,
response: ModelResponse,
tool_call_records: List[ToolCallingRecord],
num_tokens: int,
external_tool_call_request: Optional[ToolCallRequest],
) -> ChatAgentResponse:
r"""Parse the final model response into the chat agent response."""
info = self._step_get_info(
response.output_messages,
response.finish_reasons,
response.usage_dict,
response.response_id,
tool_call_records,
num_tokens,
external_tool_call_request,
)
return ChatAgentResponse(
msgs=response.output_messages,
terminated=self.terminated,
info=info,
)
def _record_final_output(self, output_messages: List[BaseMessage]) -> None:
r"""Log final messages or warnings about multiple responses."""
if len(output_messages) == 1:
self.record_message(output_messages[0])
else:
logger.warning(
"Multiple messages returned in `step()`. Record "
"selected message manually using `record_message()`."
)
def _get_model_response(
self,
openai_messages: List[OpenAIMessage],
num_tokens: int,
response_format: Optional[Type[BaseModel]] = None,
tool_schemas: Optional[List[Dict[str, Any]]] = None,
) -> ModelResponse:
r"""Internal function for agent step model response."""
response = None
try:
response = self.model_backend.run(
openai_messages, response_format, tool_schemas or None
)
except Exception as exc:
logger.error(
f"An error occurred while running model "
f"{self.model_backend.model_type}, "
f"index: {self.model_backend.current_model_index}",
exc_info=exc,
)
if not response:
raise ModelProcessingError(
"Unable to process messages: none of the provided models "
"run succesfully."
)
logger.info(
f"Model {self.model_backend.model_type}, "
f"index {self.model_backend.current_model_index}, "
f"processed these messages: {openai_messages}"
)
if isinstance(response, ChatCompletion):
return self._handle_batch_response(response)
else:
return self._handle_stream_response(response, num_tokens)
async def _aget_model_response(
self,
openai_messages: List[OpenAIMessage],
num_tokens: int,
response_format: Optional[Type[BaseModel]] = None,
tool_schemas: Optional[List[Dict[str, Any]]] = None,
) -> ModelResponse:
r"""Internal function for agent step model response."""
response = None
try:
response = await self.model_backend.arun(
openai_messages, response_format, tool_schemas or None
)
except Exception as exc:
logger.error(
f"An error occurred while running model "
f"{self.model_backend.model_type}, "
f"index: {self.model_backend.current_model_index}",
exc_info=exc,
)
if not response:
raise ModelProcessingError(
"Unable to process messages: none of the provided models "
"run succesfully."
)
logger.info(
f"Model {self.model_backend.model_type}, "
f"index {self.model_backend.current_model_index}, "
f"processed these messages: {openai_messages}"
)
if isinstance(response, ChatCompletion):
return self._handle_batch_response(response)
else:
return await self._ahandle_stream_response(response, num_tokens)
def _step_get_info(
self,
output_messages: List[BaseMessage],
finish_reasons: List[str],
usage_dict: Dict[str, int],
response_id: str,
tool_calls: List[ToolCallingRecord],
num_tokens: int,
external_tool_call_request: Optional[ToolCallRequest] = None,
) -> Dict[str, Any]:
r"""Process the output of a chat step and gather information about the
step.
This method checks for termination conditions, updates the agent's
state, and collects information about the chat step, including tool
calls and termination reasons.
Args:
output_messages (List[BaseMessage]): The messages generated in
this step.
finish_reasons (List[str]): The reasons for finishing the
generation for each message.
usage_dict (Dict[str, int]): Dictionary containing token usage
information.
response_id (str): The ID of the response from the model.
tool_calls (List[ToolCallingRecord]): Records of function calls
made during this step.
num_tokens (int): The number of tokens used in this step.
external_tool_call_request (Optional[ToolCallRequest]): The
request for external tool call.
Returns:
Dict[str, Any]: A dictionary containing information about the chat
step, including termination status, reasons, and tool call
information.
Note:
This method iterates over all response terminators and checks if
any of them signal termination. If a terminator signals
termination, the agent's state is updated accordingly, and the
termination reason is recorded.
"""
termination = [
terminator.is_terminated(output_messages)
for terminator in self.response_terminators
]
# Terminate the agent if any of the terminator terminates
self.terminated, termination_reason = next(
(
(terminated, termination_reason)
for terminated, termination_reason in termination
if terminated
),
(False, None),
)
# For now only retain the first termination reason
if self.terminated and termination_reason is not None:
finish_reasons = [termination_reason] * len(finish_reasons)
return get_info_dict(
response_id,
usage_dict,
finish_reasons,
num_tokens,
tool_calls,
external_tool_call_request,
)
def _handle_batch_response(
self, response: ChatCompletion
) -> ModelResponse:
r"""Process a batch response from the model and extract the necessary
information.
Args:
response (ChatCompletion): Model response.
Returns:
_ModelResponse: parsed model response.
"""
output_messages: List[BaseMessage] = []
for choice in response.choices:
meta_dict = {}
if logprobs_info := handle_logprobs(choice):
meta_dict["logprobs_info"] = logprobs_info
chat_message = BaseMessage(
role_name=self.role_name,
role_type=self.role_type,
meta_dict=meta_dict,
content=choice.message.content or "",
parsed=getattr(choice.message, "parsed", None),
)
output_messages.append(chat_message)
finish_reasons = [
str(choice.finish_reason) for choice in response.choices
]
usage = {}
if response.usage is not None:
usage = safe_model_dump(response.usage)
tool_call_request: Optional[ToolCallRequest] = None
if tool_calls := response.choices[0].message.tool_calls:
tool_name = tool_calls[0].function.name
tool_call_id = tool_calls[0].id
args = json.loads(tool_calls[0].function.arguments)
tool_call_request = ToolCallRequest(
tool_name=tool_name, args=args, tool_call_id=tool_call_id
)
return ModelResponse(
response=response,
tool_call_request=tool_call_request,
output_messages=output_messages,
finish_reasons=finish_reasons,
usage_dict=usage,
response_id=response.id or "",
)
def _handle_stream_response(
self,
response: Stream[ChatCompletionChunk],
prompt_tokens: int,
) -> ModelResponse:
r"""Process a stream response from the model and extract the necessary
information.
Args:
response (dict): Model response.
prompt_tokens (int): Number of input prompt tokens.
Returns:
_ModelResponse: a parsed model response.
"""
content_dict: defaultdict = defaultdict(lambda: "")
finish_reasons_dict: defaultdict = defaultdict(lambda: "")
output_messages: List[BaseMessage] = []
response_id: str = ""
# All choices in one response share one role
for chunk in response:
response_id = chunk.id
self._handle_chunk(
chunk, content_dict, finish_reasons_dict, output_messages
)
finish_reasons = [
finish_reasons_dict[i] for i in range(len(finish_reasons_dict))
]
usage_dict = self.get_usage_dict(output_messages, prompt_tokens)
# TODO: Handle tool calls
return ModelResponse(
response=response,
tool_call_request=None,
output_messages=output_messages,
finish_reasons=finish_reasons,
usage_dict=usage_dict,
response_id=response_id,
)
async def _ahandle_stream_response(
self,
response: AsyncStream[ChatCompletionChunk],
prompt_tokens: int,
) -> ModelResponse:
r"""Process a stream response from the model and extract the necessary
information.
Args:
response (dict): Model response.
prompt_tokens (int): Number of input prompt tokens.
Returns:
_ModelResponse: a parsed model response.
"""
content_dict: defaultdict = defaultdict(lambda: "")
finish_reasons_dict: defaultdict = defaultdict(lambda: "")
output_messages: List[BaseMessage] = []
response_id: str = ""
# All choices in one response share one role
async for chunk in response:
response_id = chunk.id
self._handle_chunk(
chunk, content_dict, finish_reasons_dict, output_messages
)
finish_reasons = [
finish_reasons_dict[i] for i in range(len(finish_reasons_dict))
]
usage_dict = self.get_usage_dict(output_messages, prompt_tokens)
# TODO: Handle tool calls
return ModelResponse(
response=response,
tool_call_request=None,
output_messages=output_messages,
finish_reasons=finish_reasons,
usage_dict=usage_dict,
response_id=response_id,
)
def _handle_chunk(
self,
chunk: ChatCompletionChunk,
content_dict: defaultdict,
finish_reasons_dict: defaultdict,
output_messages: List[BaseMessage],
) -> None:
r"""Handle a chunk of the model response."""
for choice in chunk.choices:
index = choice.index
delta = choice.delta
if delta.content is not None:
content_dict[index] += delta.content
if not choice.finish_reason:
continue
finish_reasons_dict[index] = choice.finish_reason
chat_message = BaseMessage(
role_name=self.role_name,
role_type=self.role_type,
meta_dict=dict(),
content=content_dict[index],
)
output_messages.append(chat_message)
def _step_token_exceed(
self,
num_tokens: int,
tool_calls: List[ToolCallingRecord],
termination_reason: str,
) -> ChatAgentResponse:
r"""Return trivial response containing number of tokens and information
of called functions when the number of tokens exceeds.
Args:
num_tokens (int): Number of tokens in the messages.
tool_calls (List[ToolCallingRecord]): List of information
objects of functions called in the current step.
termination_reason (str): String of termination reason.
Returns:
ChatAgentResponse: The struct containing trivial outputs and
information about token number and called functions.
"""
self.terminated = True
info = get_info_dict(
None,
None,
[termination_reason],
num_tokens,
tool_calls,
)
return ChatAgentResponse(
msgs=[],
terminated=self.terminated,
info=info,
)
def _execute_tool(
self,
tool_call_request: ToolCallRequest,
) -> ToolCallingRecord:
r"""Execute the tool with arguments following the model's response.
Args:
tool_call_request (_ToolCallRequest): The tool call request.
Returns:
FunctionCallingRecord: A struct for logging information about this
function call.
"""
func_name = tool_call_request.tool_name
args = tool_call_request.args
tool_call_id = tool_call_request.tool_call_id
tool = self._internal_tools[func_name]
result = tool(**args)
return self._record_tool_calling(func_name, args, result, tool_call_id)
async def _aexecute_tool(
self,
tool_call_request: ToolCallRequest,
) -> ToolCallingRecord:
func_name = tool_call_request.tool_name
args = tool_call_request.args
tool_call_id = tool_call_request.tool_call_id
tool = self._internal_tools[func_name]
result = await tool.async_call(**args)
return self._record_tool_calling(func_name, args, result, tool_call_id)
def _record_tool_calling(
self,
func_name: str,
args: Dict[str, Any],
result: Any,
tool_call_id: str,
):
r"""Record the tool calling information in the memory, and return the
tool calling record.
"""
assist_msg = FunctionCallingMessage(
role_name=self.role_name,
role_type=self.role_type,
meta_dict=None,
content="",
func_name=func_name,
args=args,
tool_call_id=tool_call_id,
)
func_msg = FunctionCallingMessage(
role_name=self.role_name,
role_type=self.role_type,
meta_dict=None,
content="",
func_name=func_name,
result=result,
tool_call_id=tool_call_id,
)
self.update_memory(assist_msg, OpenAIBackendRole.ASSISTANT)
self.update_memory(func_msg, OpenAIBackendRole.FUNCTION)
# Record information about this tool call
tool_record = ToolCallingRecord(
tool_name=func_name,
args=args,
result=result,
tool_call_id=tool_call_id,
)
return tool_record
[docs]
def get_usage_dict(
self, output_messages: List[BaseMessage], prompt_tokens: int
) -> Dict[str, int]:
r"""Get usage dictionary when using the stream mode.
Args:
output_messages (list): List of output messages.
prompt_tokens (int): Number of input prompt tokens.
Returns:
dict: Usage dictionary.
"""
encoding = get_model_encoding(self.model_type.value_for_tiktoken)
completion_tokens = sum(
len(encoding.encode(message.content))
for message in output_messages
)
return dict(
completion_tokens=completion_tokens,
prompt_tokens=prompt_tokens,
total_tokens=completion_tokens + prompt_tokens,
)
[docs]
def add_model_scheduling_strategy(self, name: str, strategy_fn: Callable):
r"""Add a scheduling strategy method provided by user to ModelManger.
Args:
name (str): The name of the strategy.
strategy_fn (Callable): The scheduling strategy function.
"""
self.model_backend.add_strategy(name, strategy_fn)
def __repr__(self) -> str:
r"""Returns a string representation of the :obj:`ChatAgent`.
Returns:
str: The string representation of the :obj:`ChatAgent`.
"""
return (
f"ChatAgent({self.role_name}, {self.role_type}, {self.model_type})"
)