Workforce

class Workforce(BaseNode):

A system where multiple worker nodes (agents) cooperate together to solve tasks. It can assign tasks to worker nodes and also take strategies such as create new worker, decompose tasks, etc. to handle situations when the task fails.

The workforce uses three specialized ChatAgents internally:

  • Coordinator Agent: Assigns tasks to workers based on their capabilities
  • Task Planner Agent: Decomposes complex tasks and composes results
  • Dynamic Workers: Created at runtime when tasks fail repeatedly

Parameters:

  • description (str): Description of the workforce.
  • children (Optional[List[BaseNode]], optional): List of child nodes under this node. Each child node can be a worker node or another workforce node. (default: :obj:None)
  • coordinator_agent_kwargs (Optional[Dict], optional): Keyword arguments passed directly to the coordinator :obj:ChatAgent constructor. The coordinator manages task assignment and failure handling strategies. See :obj:ChatAgent documentation for all available parameters. (default: :obj:None - uses ModelPlatformType.DEFAULT, ModelType.DEFAULT)
  • task_agent_kwargs (Optional[Dict], optional): Keyword arguments passed directly to the task planning :obj:ChatAgent constructor. The task agent handles task decomposition into subtasks and result composition. See :obj:ChatAgent documentation for all available parameters. (default: :obj:None - uses ModelPlatformType.DEFAULT, ModelType.DEFAULT)
  • new_worker_agent_kwargs (Optional[Dict], optional): Default keyword arguments passed to :obj:ChatAgent constructor for workers created dynamically at runtime when existing workers cannot handle failed tasks. See :obj:ChatAgent documentation for all available parameters. (default: :obj:None - creates workers with SearchToolkit, CodeExecutionToolkit, and ThinkingToolkit)

init

def __init__(
    self,
    description: str,
    children: Optional[List[BaseNode]] = None,
    coordinator_agent_kwargs: Optional[Dict] = None,
    task_agent_kwargs: Optional[Dict] = None,
    new_worker_agent_kwargs: Optional[Dict] = None
):

repr

def __repr__(self):

_decompose_task

def _decompose_task(self, task: Task):

Returns:

List[Task]: The subtasks.

process_task

def process_task(self, task: Task):

The main entry point for the workforce to process a task. It will start the workforce and all the child nodes under it, process the task provided and return the updated task.

Parameters:

  • task (Task): The task to be processed.

Returns:

Task: The updated task.

add_single_agent_worker

def add_single_agent_worker(self, description: str, worker: ChatAgent):

Add a worker node to the workforce that uses a single agent.

Parameters:

  • description (str): Description of the worker node.
  • worker (ChatAgent): The agent to be added.

Returns:

Workforce: The workforce node itself.

add_role_playing_worker

def add_role_playing_worker(
    self,
    description: str,
    assistant_role_name: str,
    user_role_name: str,
    assistant_agent_kwargs: Optional[Dict] = None,
    user_agent_kwargs: Optional[Dict] = None,
    summarize_agent_kwargs: Optional[Dict] = None,
    chat_turn_limit: int = 3
):

Add a worker node to the workforce that uses RolePlaying system.

Parameters:

  • description (str): Description of the node.
  • assistant_role_name (str): The role name of the assistant agent.
  • user_role_name (str): The role name of the user agent.
  • assistant_agent_kwargs (Optional[Dict]): The keyword arguments to initialize the assistant agent in the role playing, like the model name, etc. (default: :obj:None)
  • user_agent_kwargs (Optional[Dict]): The keyword arguments to initialize the user agent in the role playing, like the model name, etc. (default: :obj:None)
  • summarize_agent_kwargs (Optional[Dict]): The keyword arguments to initialize the summarize agent, like the model name, etc. (default: :obj:None)
  • chat_turn_limit (int): The maximum number of chat turns in the role playing. (default: :obj:3)

Returns:

Workforce: The workforce node itself.

add_workforce

def add_workforce(self, workforce: Workforce):

Add a workforce node to the workforce.

Parameters:

  • workforce (Workforce): The workforce node to be added.

Returns:

Workforce: The workforce node itself.

reset

def reset(self):

Reset the workforce and all the child nodes under it. Can only be called when the workforce is not running.

set_channel

def set_channel(self, channel: TaskChannel):

Set the channel for the node and all the child nodes under it.

_get_child_nodes_info

def _get_child_nodes_info(self):

Get the information of all the child nodes under this node.

_find_assignee

def _find_assignee(self, task: Task):

Assigns a task to a worker node with the best capability.

Parameters:

  • task (Task): The task to be assigned.

Returns:

str: ID of the worker node to be assigned.

_create_worker_node_for_task

def _create_worker_node_for_task(self, task: Task):

Creates a new worker node for a given task and add it to the children list of this node. This is one of the actions that the coordinator can take when a task has failed.

Parameters:

  • task (Task): The task for which the worker node is created.

Returns:

Worker: The created worker node.

_create_new_agent

def _create_new_agent(self, role: str, sys_msg: str):

stop

def stop(self):

Stop all the child nodes under it. The node itself will be stopped by its parent node.

clone

def clone(self, with_memory: bool = False):

Creates a new instance of Workforce with the same configuration.

Parameters:

  • with_memory (bool, optional): Whether to copy the memory (conversation history) to the new instance. If True, the new instance will have the same conversation history. If False, the new instance will have a fresh memory. (default: :obj:False)

Returns:

Workforce: A new instance of Workforce with the same configuration.