AgentMessage

class AgentMessage(BaseMessage):

Represents a message between agents, extending BaseMessage.

This class extends the standard CAMEL BaseMessage with additional attributes needed for inter-agent communication.

init

def __init__(
    self,
    message_id: str,
    sender_id: str,
    receiver_id: str,
    content: str,
    timestamp: float,
    reply_to: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
):

AgentCommunicationToolkit

class AgentCommunicationToolkit(BaseToolkit):

A toolkit for agent-to-agent communication in multi-agent systems.

Enables agents to send messages to each other with message history tracking and integration with the CAMEL workforce system.

Parameters:

  • agents (Optional[Dict[str, ChatAgent]]): Dictionary mapping agent IDs to ChatAgent instances. (default: :obj:None)
  • timeout (Optional[float]): Maximum execution time for operations in seconds. (default: :obj:None)
  • max_message_history (int): Maximum messages to keep per agent. (default: :obj:100)
  • get_response (bool): Whether to get responses from receiving agents by default. (default: :obj:False)

init

def __init__(
    self,
    agents: Optional[Dict[str, 'ChatAgent']] = None,
    timeout: Optional[float] = None,
    max_message_history: int = 100,
    get_response: bool = False
):

register_agent

def register_agent(self, agent_id: str, agent: 'ChatAgent'):

Register a new agent for communication.

Parameters:

  • agent_id (str): Unique identifier for the agent.
  • agent (ChatAgent): The ChatAgent instance to register.

Returns:

str: Confirmation message with registration details.

_find_agent_id

def _find_agent_id(self, agent_id: str):

Find agent ID with flexible matching (case-insensitive, partial matches).

send_message

def send_message(
    self,
    message: str,
    receiver_id: str,
    sender_id: str = 'system',
    reply_to: Optional[str] = None,
    metadata_json: Optional[str] = None
):

Sends a message to a specific agent.

This function allows one agent to communicate directly with another by sending a message. The toolkit’s get_response setting determines whether to get an immediate response or just send a notification. To get the receiver_id of the agent you want to communicate with, you can use the list_available_agents tool.

Parameters:

  • message (str): The content of the message to send.
  • receiver_id (str): The unique identifier of the agent to receive the message. Use list_available_agents() to find the ID of the agent you want to talk to.
  • sender_id (str): The unique identifier of the agent sending the message. This is typically your agent’s ID. (default: :obj:"system")
  • reply_to (Optional[str]): The ID of a previous message this new message is a reply to. This helps create conversation threads. (default: :obj:None)
  • metadata_json (Optional[str]): A JSON string containing extra information about the message. (default: :obj:None)

Returns:

str: A confirmation that the message was sent. If the toolkit’s get_response setting is True, includes the response from the receiving agent.

_deliver_message

def _deliver_message(self, message: AgentMessage, get_response: bool):

Deliver a message to the target agent, optionally getting a response.

broadcast_message

def broadcast_message(
    self,
    message: str,
    sender_id: str = 'system',
    exclude_agents: Optional[List[str]] = None
):

Sends a message to all other agents in the system.

This function is useful for making announcements or sending information that every agent needs. The message will be sent to all registered agents except for the sender and any agents specified in the exclude_agents list.

Parameters:

  • message (str): The content of the message to broadcast.
  • sender_id (str): The unique identifier of the agent sending the message. This is typically your agent’s ID. (default: :obj:"system")
  • exclude_agents (Optional[List[str]]): A list of agent IDs to exclude from the broadcast. The sender is automatically excluded. (default: :obj:None)

Returns:

str: A summary of the broadcast, showing which agents received the message and their responses.

get_message_history

def get_message_history(self, agent_id: str, limit: Optional[int] = None):

Retrieves the message history for a specific agent.

This function allows you to see the messages sent and received by a particular agent, which can be useful for understanding past conversations. To get the agent_id for another agent, use the list_available_agents tool. You can also use this tool to get your own message history by providing your agent ID.

Parameters:

  • agent_id (str): The unique identifier of the agent whose message history you want to retrieve. Use list_available_agents() to find available agent IDs.
  • limit (Optional[int]): The maximum number of recent messages to return. If not specified, it will return all messages up to the system’s limit. (default: :obj:None)

Returns:

str: A formatted string containing the message history for the specified agent, or an error if the agent is not found.

get_conversation_thread

def get_conversation_thread(self, message_id: str):

Retrieves a full conversation thread based on a message ID.

When you send a message that is a reply to another, it creates a conversation thread. This function lets you retrieve all messages that are part of that thread. You can find message IDs in the message history or from the output of the send_message tool.

Parameters:

  • message_id (str): The unique identifier of any message within the conversation thread you want to retrieve.

Returns:

str: A formatted string containing all messages in the conversation, sorted by time, or an error if the thread is not found.

list_available_agents

def list_available_agents(self):

Returns:

str: A formatted string listing the IDs and status of all available agents.

remove_agent

def remove_agent(self, agent_id: str):

Remove an agent from the communication registry.

Parameters:

  • agent_id (str): Unique identifier of the agent to remove.

Returns:

str: Confirmation message with cleanup details.

get_toolkit_status

def get_toolkit_status(self):

Returns:

str: Detailed status information including metrics and queue state.

_add_to_history

def _add_to_history(self, message: AgentMessage):

Add a message to the history with size management.

_update_conversation_thread

def _update_conversation_thread(self, parent_id: str, new_id: str):

Update conversation threading.

get_tools

def get_tools(self):

Returns a list of FunctionTool objects representing the communication functions in the toolkit.

Returns:

List[FunctionTool]: A list of FunctionTool objects for agent communication and management.