camel.societies.workforce package

On this page

camel.societies.workforce package#

Submodules#

camel.societies.workforce.base module#

class camel.societies.workforce.base.BaseNode(description: str)[source]#

Bases: ABC

Base class for all nodes in the workforce.

Parameters:

description (str) – Description of the node.

reset(*args: Any, **kwargs: Any) Any[source]#

Resets the node to its initial state.

abstract set_channel(channel: TaskChannel)[source]#

Sets the channel for the node.

abstract async start()[source]#

Start the node.

abstract stop()[source]#

Stop the node.

camel.societies.workforce.prompts module#

camel.societies.workforce.role_playing_worker module#

class camel.societies.workforce.role_playing_worker.RolePlayingWorker(description: str, assistant_role_name: str, user_role_name: str, assistant_agent_kwargs: Dict | None = None, user_agent_kwargs: Dict | None = None, chat_turn_limit: int = 3)[source]#

Bases: Worker

A worker node that contains a role playing.

Parameters:
  • description (str) – Description of the node.

  • assistant_role_name (str) – The role name of the assistant agent.

  • user_role_name (str) – The role name of the user agent.

  • assistant_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the assistant agent in the role playing, like the model name, etc. Defaults to None.

  • user_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the user agent in the role playing, like the model name, etc. Defaults to None.

  • chat_turn_limit (int, optional) – The maximum number of chat turns in the role playing. Defaults to 3.

camel.societies.workforce.single_agent_worker module#

class camel.societies.workforce.single_agent_worker.SingleAgentWorker(description: str, worker: ChatAgent)[source]#

Bases: Worker

A worker node that consists of a single agent.

Parameters:
  • description (str) – Description of the node.

  • worker (ChatAgent) – Worker of the node. A single agent.

reset() Any[source]#

Resets the worker to its initial state.

camel.societies.workforce.task_channel module#

class camel.societies.workforce.task_channel.Packet(task: Task, publisher_id: str, assignee_id: str | None = None, status: PacketStatus = PacketStatus.SENT)[source]#

Bases: object

The basic element inside the channel. A task is wrapped inside a packet. The packet will contain the task, along with the task’s assignee, and the task’s status.

Parameters:
  • task (Task) – The task that is wrapped inside the packet.

  • publisher_id (str) – The ID of the workforce that published the task.

  • assignee_id (str) – The ID of the workforce that is assigned to the task. Defaults to None, meaning that the task is posted as a dependency in the channel.

task#

The task that is wrapped inside the packet.

Type:

Task

publisher_id#

The ID of the workforce that published the task.

Type:

str

assignee_id#

The ID of the workforce that is assigned to the task. Would be None if the task is a dependency. Defaults to None.

Type:

Optional[str], optional

status#

The status of the task.

Type:

PacketStatus

class camel.societies.workforce.task_channel.PacketStatus(value)[source]#

Bases: Enum

The status of a packet. The packet can be in one of the following states:

  • SENT: The packet has been sent to a worker.

  • RETURNED: The packet has been returned by the worker, meaning that the status of the task inside has been updated.

  • ARCHIVED: The packet has been archived, meaning that the content of the task inside will not be changed. The task is considered as a dependency.

ARCHIVED = 'ARCHIVED'#
RETURNED = 'RETURNED'#
SENT = 'SENT'#
class camel.societies.workforce.task_channel.TaskChannel[source]#

Bases: object

An internal class used by Workforce to manage tasks.

async archive_task(task_id: str) None[source]#

Archive a task in channel, making it to become a dependency.

async get_assigned_task_by_assignee(assignee_id: str) Task[source]#

Get a task from the channel that has been assigned to the assignee.

async get_channel_debug_info() str[source]#

Get the debug information of the channel.

async get_dependency_ids() List[str][source]#

Get the IDs of all dependencies in the channel.

async get_returned_task_by_publisher(publisher_id: str) Task[source]#

Get a task from the channel that has been returned by the publisher.

async get_task_by_id(task_id: str) Task[source]#

Get a task from the channel by its ID.

async post_dependency(dependency: Task, publisher_id: str) None[source]#

Post a dependency to the channel. A dependency is a task that is archived, and will be referenced by other tasks.

async post_task(task: Task, publisher_id: str, assignee_id: str) None[source]#

Send a task to the channel with specified publisher and assignee, along with the dependency of the task.

async remove_task(task_id: str) None[source]#

Remove a task from the channel.

async return_task(task_id: str) None[source]#

Return a task to the sender, indicating that the task has been processed by the worker.

camel.societies.workforce.utils module#

class camel.societies.workforce.utils.TaskAssignResult(*, assignee_id: str)[source]#

Bases: BaseModel

The result of task assignment.

assignee_id: str#
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'assignee_id': FieldInfo(annotation=str, required=True, description='The ID of the workforce that is assigned to the task.')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class camel.societies.workforce.utils.TaskResult(*, content: str, failed: bool)[source]#

Bases: BaseModel

The result of a task.

content: str#
failed: bool#
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'content': FieldInfo(annotation=str, required=True, description='The result of the task.'), 'failed': FieldInfo(annotation=bool, required=True, description='Flag indicating whether the task processing failed.')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

class camel.societies.workforce.utils.WorkerConf(*, role: str, sys_msg: str, description: str)[source]#

Bases: BaseModel

The configuration of a worker.

description: str#
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'description': FieldInfo(annotation=str, required=True, description='The description of the new work node itself.'), 'role': FieldInfo(annotation=str, required=True, description='The role of the agent working in the work node.'), 'sys_msg': FieldInfo(annotation=str, required=True, description='The system message that will be sent to the agent in the node.')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

role: str#
sys_msg: str#
camel.societies.workforce.utils.check_if_running(running: bool) Callable[source]#

Check if the workforce is (not) running, specified the boolean value. If the workforce is not in the expected status, raise an exception.

Raises:

RuntimeError – If the workforce is not in the expected status.

camel.societies.workforce.worker module#

class camel.societies.workforce.worker.Worker(description: str)[source]#

Bases: BaseNode, ABC

A worker node that works on tasks. It is the basic unit of task processing in the workforce system.

Parameters:

description (str) – Description of the node.

set_channel(channel: TaskChannel)[source]#

Sets the channel for the node.

start()[source]#

Start the worker.

stop()[source]#

Stop the worker.

camel.societies.workforce.workforce module#

class camel.societies.workforce.workforce.Workforce(description: str, children: List[BaseNode] | None = None, coordinator_agent_kwargs: Dict | None = None, task_agent_kwargs: Dict | None = None, new_worker_agent_kwargs: Dict | None = None)[source]#

Bases: BaseNode

A system where multiple workder nodes (agents) cooperate together to solve tasks. It can assign tasks to workder nodes and also take strategies such as create new worker, decompose tasks, etc. to handle situations when the task fails.

Parameters:
  • description (str) – Description of the node.

  • children (Optional[List[BaseNode]], optional) – List of child nodes under this node. Each child node can be a worker node or another workforce node. (default: None)

  • coordinator_agent_kwargs (Optional[Dict], optional) – Keyword arguments for the coordinator agent, e.g. model, api_key, tools, etc. (default: None)

  • task_agent_kwargs (Optional[Dict], optional) – Keyword arguments for the task agent, e.g. model, api_key, tools, etc. (default: None)

  • new_worker_agent_kwargs (Optional[Dict]) – Default keyword arguments for the worker agent that will be created during runtime to handle failed tasks, e.g. model, api_key, tools, etc. (default: None)

add_role_playing_worker(description: str, assistant_role_name: str, user_role_name: str, assistant_agent_kwargs: Dict | None = None, user_agent_kwargs: Dict | None = None, chat_turn_limit: int = 3) Workforce[source]#

Add a worker node to the workforce that uses RolePlaying system.

Parameters:
  • description (str) – Description of the node.

  • assistant_role_name (str) – The role name of the assistant agent.

  • user_role_name (str) – The role name of the user agent.

  • assistant_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the assistant agent in the role playing, like the model name, etc. Defaults to None.

  • user_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the user agent in the role playing, like the model name, etc. Defaults to None.

  • chat_turn_limit (int, optional) – The maximum number of chat turns in the role playing. Defaults to 3.

Returns:

The workforce node itself.

Return type:

Workforce

add_single_agent_worker(description: str, worker: ChatAgent) Workforce[source]#

Add a worker node to the workforce that uses a single agent.

Parameters:
  • description (str) – Description of the worker node.

  • worker (ChatAgent) – The agent to be added.

Returns:

The workforce node itself.

Return type:

Workforce

add_workforce(workforce: Workforce) Workforce[source]#

Add a workforce node to the workforce.

Parameters:

workforce (Workforce) – The workforce node to be added.

Returns:

The workforce node itself.

Return type:

Workforce

process_task(task: Task) Task[source]#

The main entry point for the workforce to process a task. It will start the workforce and all the child nodes under it, process the task provided and return the updated task.

Parameters:

task (Task) – The task to be processed.

Returns:

The updated task.

Return type:

Task

reset() None[source]#

Reset the workforce and all the child nodes under it. Can only be called when the workforce is not running.

set_channel(channel: TaskChannel) None[source]#

Set the channel for the node and all the child nodes under it.

start() None[source]#

Start itself and all the child nodes under it.

stop() None[source]#

Stop all the child nodes under it. The node itself will be stopped by its parent node.

Module contents#

class camel.societies.workforce.RolePlayingWorker(description: str, assistant_role_name: str, user_role_name: str, assistant_agent_kwargs: Dict | None = None, user_agent_kwargs: Dict | None = None, chat_turn_limit: int = 3)[source]#

Bases: Worker

A worker node that contains a role playing.

Parameters:
  • description (str) – Description of the node.

  • assistant_role_name (str) – The role name of the assistant agent.

  • user_role_name (str) – The role name of the user agent.

  • assistant_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the assistant agent in the role playing, like the model name, etc. Defaults to None.

  • user_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the user agent in the role playing, like the model name, etc. Defaults to None.

  • chat_turn_limit (int, optional) – The maximum number of chat turns in the role playing. Defaults to 3.

class camel.societies.workforce.SingleAgentWorker(description: str, worker: ChatAgent)[source]#

Bases: Worker

A worker node that consists of a single agent.

Parameters:
  • description (str) – Description of the node.

  • worker (ChatAgent) – Worker of the node. A single agent.

reset() Any[source]#

Resets the worker to its initial state.

class camel.societies.workforce.Workforce(description: str, children: List[BaseNode] | None = None, coordinator_agent_kwargs: Dict | None = None, task_agent_kwargs: Dict | None = None, new_worker_agent_kwargs: Dict | None = None)[source]#

Bases: BaseNode

A system where multiple workder nodes (agents) cooperate together to solve tasks. It can assign tasks to workder nodes and also take strategies such as create new worker, decompose tasks, etc. to handle situations when the task fails.

Parameters:
  • description (str) – Description of the node.

  • children (Optional[List[BaseNode]], optional) – List of child nodes under this node. Each child node can be a worker node or another workforce node. (default: None)

  • coordinator_agent_kwargs (Optional[Dict], optional) – Keyword arguments for the coordinator agent, e.g. model, api_key, tools, etc. (default: None)

  • task_agent_kwargs (Optional[Dict], optional) – Keyword arguments for the task agent, e.g. model, api_key, tools, etc. (default: None)

  • new_worker_agent_kwargs (Optional[Dict]) – Default keyword arguments for the worker agent that will be created during runtime to handle failed tasks, e.g. model, api_key, tools, etc. (default: None)

add_role_playing_worker(description: str, assistant_role_name: str, user_role_name: str, assistant_agent_kwargs: Dict | None = None, user_agent_kwargs: Dict | None = None, chat_turn_limit: int = 3) Workforce[source]#

Add a worker node to the workforce that uses RolePlaying system.

Parameters:
  • description (str) – Description of the node.

  • assistant_role_name (str) – The role name of the assistant agent.

  • user_role_name (str) – The role name of the user agent.

  • assistant_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the assistant agent in the role playing, like the model name, etc. Defaults to None.

  • user_agent_kwargs (Optional[Dict], optional) – The keyword arguments to initialize the user agent in the role playing, like the model name, etc. Defaults to None.

  • chat_turn_limit (int, optional) – The maximum number of chat turns in the role playing. Defaults to 3.

Returns:

The workforce node itself.

Return type:

Workforce

add_single_agent_worker(description: str, worker: ChatAgent) Workforce[source]#

Add a worker node to the workforce that uses a single agent.

Parameters:
  • description (str) – Description of the worker node.

  • worker (ChatAgent) – The agent to be added.

Returns:

The workforce node itself.

Return type:

Workforce

add_workforce(workforce: Workforce) Workforce[source]#

Add a workforce node to the workforce.

Parameters:

workforce (Workforce) – The workforce node to be added.

Returns:

The workforce node itself.

Return type:

Workforce

process_task(task: Task) Task[source]#

The main entry point for the workforce to process a task. It will start the workforce and all the child nodes under it, process the task provided and return the updated task.

Parameters:

task (Task) – The task to be processed.

Returns:

The updated task.

Return type:

Task

reset() None[source]#

Reset the workforce and all the child nodes under it. Can only be called when the workforce is not running.

set_channel(channel: TaskChannel) None[source]#

Set the channel for the node and all the child nodes under it.

start() None[source]#

Start itself and all the child nodes under it.

stop() None[source]#

Stop all the child nodes under it. The node itself will be stopped by its parent node.