> ## Documentation Index
> Fetch the complete documentation index at: https://docs.camel-ai.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Camel.societies.workforce.workforce

<a id="camel.societies.workforce.workforce" />

<a id="camel.societies.workforce.workforce.WorkforceState" />

## WorkforceState

```python theme={"system"}
class WorkforceState(Enum):
```

Workforce execution state for human intervention support.

<a id="camel.societies.workforce.workforce.WorkforceMode" />

## WorkforceMode

```python theme={"system"}
class WorkforceMode(Enum):
```

Workforce execution mode for different task processing strategies.

<a id="camel.societies.workforce.workforce.WorkforceSnapshot" />

## WorkforceSnapshot

```python theme={"system"}
class WorkforceSnapshot:
```

Snapshot of workforce state for resuming execution.

<a id="camel.societies.workforce.workforce.WorkforceSnapshot.__init__" />

### **init**

```python theme={"system"}
def __init__(
    self,
    main_task: Optional[Task] = None,
    pending_tasks: Optional[Deque[Task]] = None,
    completed_tasks: Optional[List[Task]] = None,
    task_dependencies: Optional[Dict[str, List[str]]] = None,
    assignees: Optional[Dict[str, str]] = None,
    current_task_index: int = 0,
    description: str = ''
):
```

<a id="camel.societies.workforce.workforce.Workforce" />

## Workforce

```python theme={"system"}
class Workforce(BaseNode):
```

A system where multiple worker nodes (agents) cooperate together
to solve tasks. It can assign tasks to worker nodes and also take
strategies such as create new worker, decompose tasks, etc. to handle
situations when the task fails.

The workforce uses three specialized ChatAgents internally:

* Coordinator Agent: Assigns tasks to workers based on their
  capabilities
* Task Planner Agent: Decomposes complex tasks and composes results
* Dynamic Workers: Created at runtime when tasks fail repeatedly

**Parameters:**

* **description** (str): Description of the workforce.
* **children** (Optional\[List\[BaseNode]], optional): List of child nodes under this node. Each child node can be a worker node or another workforce node. (default: :obj:`None`)
* **coordinator\_agent** (Optional\[ChatAgent], optional): A custom coordinator agent instance for task assignment and worker creation. If provided, the workforce will create a new agent using this agent's model configuration but with the required system message and functionality. If None, a default agent will be created using DEFAULT model settings. (default: :obj:`None`)
* **task\_agent** (Optional\[ChatAgent], optional): A custom task planning agent instance for task decomposition and composition. If provided, the workforce will create a new agent using this agent's model configuration but with the required system message. If None, a default agent will be created using DEFAULT model settings. (default: :obj:`None`)
* **new\_worker\_agent** (Optional\[ChatAgent], optional): A template agent for workers created dynamically at runtime when existing workers cannot handle failed tasks. If None, workers will be created with default settings including SearchToolkit, CodeExecutionToolkit, and ThinkingToolkit. (default: :obj:`None`) default\_model (Optional\[Union\[BaseModelBackend, ModelManager]], optional): Model backend or manager to use when creating default coordinator, task, or dynamic worker agents. If None, agents will be created using ModelPlatformType.DEFAULT and ModelType.DEFAULT settings. (default: :obj:`None`)
* **graceful\_shutdown\_timeout** (float, optional): The timeout in seconds for graceful shutdown when a task fails 3 times. During this period, the workforce remains active for debugging. Set to 0 for immediate shutdown. (default: :obj:`15.0`)
* **task\_timeout\_seconds** (Optional\[float], optional): The timeout in seconds for waiting for tasks to be returned by workers. If None, uses the global TASK\_TIMEOUT\_SECONDS value (600.0 seconds). Increase this value for tasks that require more processing time. (default: :obj:`None`)
* **share\_memory** (bool, optional): Whether to enable shared memory across SingleAgentWorker instances in the workforce. When enabled, all SingleAgentWorker instances, coordinator agent, and task planning agent will share their complete conversation history and function-calling trajectory, providing better context for task handoffs and continuity. Note: Currently only supports SingleAgentWorker instances; RolePlayingWorker and nested Workforce instances do not participate in memory sharing. (default: :obj:`False`)
* **use\_structured\_output\_handler** (bool, optional): Whether to use the structured output handler instead of native structured output. When enabled, the workforce will use prompts with structured output instructions and regex extraction to parse responses. This ensures compatibility with agents that don't reliably support native structured output. When disabled, the workforce uses the native response\_format parameter. (default: :obj:`True`)
* **callbacks** (Optional\[List\[WorkforceCallback]], optional): A list of callback handlers to observe and record workforce lifecycle events and metrics (e.g., task creation/assignment/start/completion/ failure, worker creation/deletion, all-tasks-completed). All items must be instances of :class:`WorkforceCallback`, otherwise
* **a**: class:`ValueError` is raised. If none of the provided callbacks implement :class:`WorkforceMetrics`, a built-in :class:`WorkforceLogger` (implements both callback and metrics) is added automatically. If at least one provided callback
* **implements**: class:`WorkforceMetrics`, no default logger is added. (default: :obj:`None`)
* **mode** (WorkforceMode, optional): The execution mode for task processing. AUTO\_DECOMPOSE mode uses intelligent recovery strategies (decompose, replan, etc.) when tasks fail. PIPELINE mode uses simple retry logic and allows failed tasks to continue the workflow, passing error information to dependent tasks. (default: :obj:`WorkforceMode.AUTO_DECOMPOSE`)
* **failure\_handling\_config** (Optional\[Union\[FailureHandlingConfig, Dict]]): Configuration for customizing failure handling behavior. Can be a FailureHandlingConfig instance or a dict with the same fields. Allows fine-grained control over which recovery strategies are enabled, maximum retry attempts, and whether to halt on max retries. The `enabled_strategies` field accepts both enum values and string lists like `["retry", "replan"]`. If None, uses default configuration with all strategies enabled. (default: :obj:`None`)

**Note:**

When custom coordinator\_agent or task\_agent are provided, the workforce
will preserve the user's system message and append the required
workforce coordination or task planning instructions to it. This
ensures both the user's intent is preserved and proper workforce
functionality is maintained. All other agent configurations (model,
memory, tools, etc.) will also be preserved.

<a id="camel.societies.workforce.workforce.Workforce.__init__" />

### **init**

```python theme={"system"}
def __init__(
    self,
    description: str,
    children: Optional[List[BaseNode]] = None,
    coordinator_agent: Optional[ChatAgent] = None,
    task_agent: Optional[ChatAgent] = None,
    new_worker_agent: Optional[ChatAgent] = None,
    default_model: Optional[Union[BaseModelBackend, ModelManager]] = None,
    graceful_shutdown_timeout: float = 15.0,
    share_memory: bool = False,
    use_structured_output_handler: bool = True,
    task_timeout_seconds: Optional[float] = None,
    mode: WorkforceMode = WorkforceMode.AUTO_DECOMPOSE,
    callbacks: Optional[List[WorkforceCallback]] = None,
    failure_handling_config: Optional[Union[FailureHandlingConfig, Dict[str, Any]]] = None
):
```

<a id="camel.societies.workforce.workforce.Workforce._initialize_callbacks" />

### \_initialize\_callbacks

```python theme={"system"}
def _initialize_callbacks(self, callbacks: Optional[List[WorkforceCallback]]):
```

Validate, register, and prime workforce callbacks.

<a id="camel.societies.workforce.workforce.Workforce._notify_worker_created" />

### \_notify\_worker\_created

```python theme={"system"}
def _notify_worker_created(self, worker_node: BaseNode):
```

Emit a worker-created event to all registered callbacks.

<a id="camel.societies.workforce.workforce.Workforce._get_or_create_shared_context_utility" />

### \_get\_or\_create\_shared\_context\_utility

```python theme={"system"}
def _get_or_create_shared_context_utility(self, session_id: Optional[str] = None):
```

Get or create the shared context utility for workflow management.

This method creates the context utility only when needed, avoiding
unnecessary session folder creation during initialization.

**Parameters:**

* **session\_id** (Optional\[str]): Custom session ID to use. If None, auto-generates a timestamped session ID. (default: :obj:`None`)

**Returns:**

ContextUtility: The shared context utility instance.

<a id="camel.societies.workforce.workforce.Workforce._get_role_identifier" />

### \_get\_role\_identifier

```python theme={"system"}
def _get_role_identifier(
    self,
    worker: ChatAgent,
    description: str,
    workflow_summary: Optional['WorkflowSummary'] = None
):
```

Extract role identifier for organizing workflows.

Uses priority fallback: role\_name → agent\_title (from
WorkflowSummary) → sanitized description.

**Parameters:**

* **worker** (ChatAgent): The worker agent to extract role from.
* **description** (str): Worker description to use as fallback.
* **workflow\_summary** (Optional\[WorkflowSummary]): Optional WorkflowSummary object that may contain agent\_title field.

**Returns:**

str: Role identifier for organizing workflows.

<a id="camel.societies.workforce.workforce.Workforce._validate_agent_compatibility" />

### \_validate\_agent\_compatibility

```python theme={"system"}
def _validate_agent_compatibility(self, agent: ChatAgent, agent_context: str = 'agent'):
```

Validate that agent configuration is compatible with workforce
settings.

**Parameters:**

* **agent** (ChatAgent): The agent to validate.
* **agent\_context** (str): Context description for error messages.

<a id="camel.societies.workforce.workforce.Workforce._attach_pause_event_to_agent" />

### \_attach\_pause\_event\_to\_agent

```python theme={"system"}
def _attach_pause_event_to_agent(self, agent: ChatAgent):
```

Ensure the given ChatAgent shares this workforce's pause\_event.

If the agent already has a different pause\_event we overwrite it and
emit a debug log (it is unlikely an agent needs multiple independent
pause controls once managed by this workforce).

<a id="camel.societies.workforce.workforce.Workforce._ensure_pause_event_in_kwargs" />

### \_ensure\_pause\_event\_in\_kwargs

```python theme={"system"}
def _ensure_pause_event_in_kwargs(self, kwargs: Optional[Dict]):
```

Insert pause\_event into kwargs dict for ChatAgent construction.

<a id="camel.societies.workforce.workforce.Workforce.__repr__" />

### **repr**

```python theme={"system"}
def __repr__(self):
```

<a id="camel.societies.workforce.workforce.Workforce.set_mode" />

### set\_mode

```python theme={"system"}
def set_mode(self, mode: WorkforceMode):
```

Set the execution mode of the workforce.

This allows switching between AUTO\_DECOMPOSE and PIPELINE modes.
Useful when you want to reuse the same workforce instance for
different task processing strategies.

**Parameters:**

* **mode** (WorkforceMode): The desired execution mode. - AUTO\_DECOMPOSE: Intelligent task decomposition with recovery - PIPELINE: Predefined task pipeline with simple retry logic

**Returns:**

Workforce: Self for method chaining.

<a id="camel.societies.workforce.workforce.Workforce._ensure_pipeline_builder" />

### \_ensure\_pipeline\_builder

```python theme={"system"}
def _ensure_pipeline_builder(self):
```

**Returns:**

PipelineTaskBuilder: The initialized pipeline builder instance.

<a id="camel.societies.workforce.workforce.Workforce.pipeline_add" />

### pipeline\_add

```python theme={"system"}
def pipeline_add(
    self,
    content: Union[str, Task],
    task_id: Optional[str] = None,
    dependencies: Optional[List[str]] = None,
    additional_info: Optional[Dict[str, Any]] = None,
    auto_depend: bool = True
):
```

Add a task to the pipeline with support for chaining.

Accepts either a string for simple tasks or a Task object for
advanced usage with metadata, images, or custom configurations.

**Parameters:**

* **content** (Union\[str, Task]): The task content string or a Task object. If a Task object is provided, task\_id and additional\_info parameters are ignored.
* **task\_id** (str, optional): Unique identifier for the task. If None, a unique ID will be generated. Only used when content is a string. (default: :obj:`None`)
* **dependencies** (List\[str], optional): List of task IDs that this task depends on. If None and auto\_depend=True, will depend on the last added task. (default: :obj:`None`)
* **additional\_info** (Dict\[str, Any], optional): Additional information for the task. Only used when content is a string. (default: :obj:`None`)
* **auto\_depend** (bool, optional): If True and dependencies is None, automatically depend on the last added task. (default: :obj:`True`)

**Returns:**

Workforce: Self for method chaining.

<a id="camel.societies.workforce.workforce.Workforce.add_parallel_pipeline_tasks" />

### add\_parallel\_pipeline\_tasks

```python theme={"system"}
def add_parallel_pipeline_tasks(
    self,
    task_contents: Union[List[str], List[Task]],
    dependencies: Optional[List[str]] = None,
    task_id_prefix: str = 'parallel',
    auto_depend: bool = True
):
```

Add multiple parallel tasks to the pipeline.

Accepts either a list of strings for simple tasks or a list of Task
objects for advanced usage with metadata, images, or custom
configurations.

**Parameters:**

* **task\_contents** (Union\[List\[str], List\[Task]]): List of task content strings or Task objects. If Task objects are provided, task\_id\_prefix is ignored.
* **dependencies** (List\[str], optional): Common dependencies for all parallel tasks. (default: :obj:`None`)
* **task\_id\_prefix** (str, optional): Prefix for generated task IDs. Only used when task\_contents contains strings. (default: :obj:`"parallel"`)
* **auto\_depend** (bool, optional): If True and dependencies is None, automatically depend on the last added task. (default: :obj:`True`)

**Returns:**

Workforce: Self for method chaining.

<a id="camel.societies.workforce.workforce.Workforce.add_sync_pipeline_task" />

### add\_sync\_pipeline\_task

```python theme={"system"}
def add_sync_pipeline_task(
    self,
    content: Union[str, Task],
    wait_for: Optional[List[str]] = None,
    task_id: Optional[str] = None
):
```

Add a synchronization task that waits for multiple tasks.

Accepts either a string for simple tasks or a Task object for
advanced usage with metadata, images, or custom configurations.

**Parameters:**

* **content** (Union\[str, Task]): Content of the synchronization task or a Task object. If a Task object is provided, task\_id parameter is ignored.
* **wait\_for** (List\[str], optional): List of task IDs to wait for. If None, will automatically wait for the last parallel tasks. (default: :obj:`None`)
* **task\_id** (str, optional): ID for the sync task. Only used when content is a string. (default: :obj:`None`)

**Returns:**

Workforce: Self for method chaining.

<a id="camel.societies.workforce.workforce.Workforce.pipeline_fork" />

### pipeline\_fork

```python theme={"system"}
def pipeline_fork(self, task_contents: Union[List[str], List[Task]]):
```

Create parallel branches from the current task.

Accepts either a list of strings for simple tasks or a list of Task
objects for advanced usage with metadata, images, or custom
configurations.

**Parameters:**

* **task\_contents** (Union\[List\[str], List\[Task]]): List of task content strings or Task objects for parallel execution.

**Returns:**

Workforce: Self for method chaining.

<a id="camel.societies.workforce.workforce.Workforce.pipeline_join" />

### pipeline\_join

```python theme={"system"}
def pipeline_join(self, content: Union[str, Task], task_id: Optional[str] = None):
```

Join parallel branches with a synchronization task.

Accepts either a string for simple tasks or a Task object for
advanced usage with metadata, images, or custom configurations.

**Parameters:**

* **content** (Union\[str, Task]): Content of the join/sync task or a Task object. If a Task object is provided, task\_id parameter is ignored.
* **task\_id** (str, optional): ID for the sync task. Only used when content is a string. (default: :obj:`None`)

**Returns:**

Workforce: Self for method chaining.

<a id="camel.societies.workforce.workforce.Workforce.pipeline_build" />

### pipeline\_build

```python theme={"system"}
def pipeline_build(self):
```

**Returns:**

Workforce: Self for method chaining.

<a id="camel.societies.workforce.workforce.Workforce.get_pipeline_builder" />

### get\_pipeline\_builder

```python theme={"system"}
def get_pipeline_builder(self):
```

**Returns:**

PipelineTaskBuilder: The pipeline builder instance.

<a id="camel.societies.workforce.workforce.Workforce.set_pipeline_tasks" />

### set\_pipeline\_tasks

```python theme={"system"}
def set_pipeline_tasks(self, tasks: List[Task]):
```

Set predefined pipeline tasks for PIPELINE mode.

**Parameters:**

* **tasks** (List\[Task]): List of tasks with dependencies already set. The dependencies should be Task objects in the Task.dependencies attribute.

<a id="camel.societies.workforce.workforce.Workforce._collect_shared_memory" />

### \_collect\_shared\_memory

```python theme={"system"}
def _collect_shared_memory(self):
```

**Returns:**

Dict\[str, List]: A dictionary mapping agent types to their memory
records. Contains entries for 'coordinator', 'task\_agent',
and 'workers'.

<a id="camel.societies.workforce.workforce.Workforce._share_memory_with_agents" />

### \_share\_memory\_with\_agents

```python theme={"system"}
def _share_memory_with_agents(self, shared_memory: Dict[str, List]):
```

Share collected memory with coordinator, task agent, and
SingleAgentWorker instances.

**Parameters:**

* **shared\_memory** (Dict\[str, List]): Memory records collected from all agents to be shared.

<a id="camel.societies.workforce.workforce.Workforce._sync_shared_memory" />

### \_sync\_shared\_memory

```python theme={"system"}
def _sync_shared_memory(self):
```

Synchronize memory across all agents by collecting and sharing.

<a id="camel.societies.workforce.workforce.Workforce._update_dependencies_for_decomposition" />

### \_update\_dependencies\_for\_decomposition

```python theme={"system"}
def _update_dependencies_for_decomposition(self, original_task: Task, subtasks: List[Task]):
```

Update dependency tracking when a task is decomposed into subtasks.
Tasks that depended on the original task should now depend on all
subtasks. The last subtask inherits the original task's dependencies.

<a id="camel.societies.workforce.workforce.Workforce._increment_in_flight_tasks" />

### \_increment\_in\_flight\_tasks

```python theme={"system"}
def _increment_in_flight_tasks(self, task_id: str):
```

Safely increment the in-flight tasks counter with logging.

<a id="camel.societies.workforce.workforce.Workforce._decrement_in_flight_tasks" />

### \_decrement\_in\_flight\_tasks

```python theme={"system"}
def _decrement_in_flight_tasks(self, task_id: str, context: str = ''):
```

Safely decrement the in-flight tasks counter with safety checks.

<a id="camel.societies.workforce.workforce.Workforce._cleanup_task_tracking" />

### \_cleanup\_task\_tracking

```python theme={"system"}
def _cleanup_task_tracking(self, task_id: str):
```

Clean up tracking data for a task to prevent memory leaks.

**Parameters:**

* **task\_id** (str): The ID of the task to clean up.

<a id="camel.societies.workforce.workforce.Workforce._decompose_task" />

### \_decompose\_task

```python theme={"system"}
def _decompose_task(
    self,
    task: Task,
    stream_callback: Optional[Callable[['ChatAgentResponse'], None]] = None
):
```

Decompose the task into subtasks. This method will also set the
relationship between the task and its subtasks.

**Parameters:**

* **task** (Task): The task to decompose.
* **stream\_callback** (Callable\[\[ChatAgentResponse], None], optional): A callback function that receives each chunk (ChatAgentResponse) during streaming decomposition.

**Returns:**

Union\[List\[Task], Generator\[List\[Task], None, None]]:
The subtasks or generator of subtasks. Returns empty list for
PIPELINE mode.

<a id="camel.societies.workforce.workforce.Workforce._get_available_strategies_text" />

### \_get\_available\_strategies\_text

```python theme={"system"}
def _get_available_strategies_text(self):
```

**Returns:**

str: Formatted text describing available strategies for the prompt.

<a id="camel.societies.workforce.workforce.Workforce._analyze_task" />

### \_analyze\_task

```python theme={"system"}
def _analyze_task(self, task: Task):
```

Unified task analysis for both failures and quality evaluation.

This method consolidates the logic for analyzing task failures and
evaluating task quality, using the unified TASK\_ANALYSIS\_PROMPT.

**Parameters:**

* **task** (Task): The task to analyze
* **for\_failure** (bool): True for failure analysis, False for quality evaluation
* **error\_message** (Optional\[str]): Error message, required when for\_failure=True

**Returns:**

TaskAnalysisResult: Unified analysis result with recovery strategy
and optional quality metrics

<a id="camel.societies.workforce.workforce.Workforce.pause" />

### pause

```python theme={"system"}
def pause(self):
```

Pause the workforce execution.
If the internal event-loop is already running we schedule the
asynchronous pause coroutine onto it.  When the loop has not yet
been created (e.g. the caller presses the hot-key immediately after
workforce start-up) we fall back to a synchronous state change so
that no tasks will be scheduled until the loop is ready.

<a id="camel.societies.workforce.workforce.Workforce.resume" />

### resume

```python theme={"system"}
def resume(self):
```

Resume execution after a manual pause.

<a id="camel.societies.workforce.workforce.Workforce.stop_gracefully" />

### stop\_gracefully

```python theme={"system"}
def stop_gracefully(self):
```

Request workforce to finish current in-flight work then halt.

Works both when the internal event-loop is alive and when it has not
yet been started.  In the latter case we simply mark the stop flag so
that the loop (when it eventually starts) will exit immediately after
initialisation.

<a id="camel.societies.workforce.workforce.Workforce.stop_immediately" />

### stop\_immediately

```python theme={"system"}
def stop_immediately(self):
```

Force-stop without waiting for current tasks to finish.

**Note:**

Child nodes will receive stop signals but may still be cleaning up
when this method returns.

<a id="camel.societies.workforce.workforce.Workforce.skip_gracefully" />

### skip\_gracefully

```python theme={"system"}
def skip_gracefully(self):
```

Request workforce to skip current pending tasks and move to next
main task from the queue. If no main tasks exist, acts like
stop\_gracefully.

This method clears the current pending subtasks and moves to the next
main task in the queue if available. Works both when the internal
event-loop is alive and when it has not yet been started.

<a id="camel.societies.workforce.workforce.Workforce.save_snapshot" />

### save\_snapshot

```python theme={"system"}
def save_snapshot(self, description: str = ''):
```

Save current state as a snapshot.

<a id="camel.societies.workforce.workforce.Workforce.list_snapshots" />

### list\_snapshots

```python theme={"system"}
def list_snapshots(self):
```

List all available snapshots.

<a id="camel.societies.workforce.workforce.Workforce.get_pending_tasks" />

### get\_pending\_tasks

```python theme={"system"}
def get_pending_tasks(self):
```

Get current pending tasks for human review.

<a id="camel.societies.workforce.workforce.Workforce.get_completed_tasks" />

### get\_completed\_tasks

```python theme={"system"}
def get_completed_tasks(self):
```

Get completed tasks.

<a id="camel.societies.workforce.workforce.Workforce.modify_task_content" />

### modify\_task\_content

```python theme={"system"}
def modify_task_content(self, task_id: str, new_content: str):
```

Modify the content of a pending task.

<a id="camel.societies.workforce.workforce.Workforce.get_main_task_queue" />

### get\_main\_task\_queue

```python theme={"system"}
def get_main_task_queue(self):
```

**Returns:**

List\[Task]: List of main tasks waiting to be decomposed
and executed.

<a id="camel.societies.workforce.workforce.Workforce.add_task" />

### add\_task

```python theme={"system"}
def add_task(
    self,
    content: str,
    task_id: Optional[str] = None,
    additional_info: Optional[Dict[str, Any]] = None,
    as_subtask: bool = False,
    insert_position: int = -1
):
```

Add a new task to the workforce.

By default, this method adds a main task that will be decomposed into
subtasks. Set `as_subtask=True` to add a task directly to the pending
subtask queue without decomposition.

**Parameters:**

* **content** (str): The content of the task.
* **task\_id** (Optional\[str], optional): Optional ID for the task. If not provided, a unique ID will be generated.
* **additional\_info** (Optional\[Dict\[str, Any]], optional): Optional additional metadata for the task.
* **as\_subtask** (bool, optional): If True, adds the task directly to the pending subtask queue. If False, adds as a main task that will be decomposed. Defaults to False.
* **insert\_position** (int, optional): Position to insert the task in the pending queue. Only applies when as\_subtask=True. Defaults to -1 (append to end).

**Returns:**

Task: The created task object.

<a id="camel.societies.workforce.workforce.Workforce.add_main_task" />

### add\_main\_task

```python theme={"system"}
def add_main_task(
    self,
    content: str,
    task_id: Optional[str] = None,
    additional_info: Optional[Dict[str, Any]] = None
):
```

Add a new main task that will be decomposed into subtasks.

This is an alias for :meth:`add_task` with `as_subtask=False`.

**Parameters:**

* **content** (str): The content of the main task.
* **task\_id** (Optional\[str], optional): Optional ID for the task.
* **additional\_info** (Optional\[Dict\[str, Any]], optional): Optional additional metadata.

**Returns:**

Task: The created main task object.

<a id="camel.societies.workforce.workforce.Workforce.add_subtask" />

### add\_subtask

```python theme={"system"}
def add_subtask(
    self,
    content: str,
    task_id: Optional[str] = None,
    additional_info: Optional[Dict[str, Any]] = None,
    insert_position: int = -1
):
```

Add a new subtask to the current pending queue.

This is an alias for :meth:`add_task` with `as_subtask=True`.

**Parameters:**

* **content** (str): The content of the subtask.
* **task\_id** (Optional\[str], optional): Optional ID for the task.
* **additional\_info** (Optional\[Dict\[str, Any]], optional): Optional additional metadata.
* **insert\_position** (int, optional): Position to insert the task. Defaults to -1 (append to end).

**Returns:**

Task: The created subtask object.

<a id="camel.societies.workforce.workforce.Workforce.remove_task" />

### remove\_task

```python theme={"system"}
def remove_task(self, task_id: str):
```

Remove a task from the pending queue or main task queue.

**Parameters:**

* **task\_id** (str): The ID of the task to remove.

**Returns:**

bool: True if task was found and removed, False otherwise.

<a id="camel.societies.workforce.workforce.Workforce.reorder_tasks" />

### reorder\_tasks

```python theme={"system"}
def reorder_tasks(self, task_ids: List[str]):
```

Reorder pending tasks according to the provided task IDs list.

<a id="camel.societies.workforce.workforce.Workforce.resume_from_task" />

### resume\_from\_task

```python theme={"system"}
def resume_from_task(self, task_id: str):
```

Resume execution from a specific task.

<a id="camel.societies.workforce.workforce.Workforce.restore_from_snapshot" />

### restore\_from\_snapshot

```python theme={"system"}
def restore_from_snapshot(self, snapshot_index: int):
```

Restore workforce state from a snapshot.

<a id="camel.societies.workforce.workforce.Workforce.get_workforce_status" />

### get\_workforce\_status

```python theme={"system"}
def get_workforce_status(self):
```

Get current workforce status for human review.

<a id="camel.societies.workforce.workforce.Workforce._collect_pipeline_results" />

### \_collect\_pipeline\_results

```python theme={"system"}
def _collect_pipeline_results(self):
```

Collect results from all completed pipeline tasks.

<a id="camel.societies.workforce.workforce.Workforce._all_pipeline_tasks_successful" />

### \_all\_pipeline\_tasks\_successful

```python theme={"system"}
def _all_pipeline_tasks_successful(self):
```

**Returns:**

bool: True if all tasks completed successfully (DONE state),
False if any tasks failed or are still pending.

<a id="camel.societies.workforce.workforce.Workforce.process_task" />

### process\_task

```python theme={"system"}
def process_task(self, task: Task):
```

Synchronous wrapper for process\_task that handles async operations
internally.

**Parameters:**

* **task** (Task): The task to be processed.

**Returns:**

Task: The updated task.

<a id="camel.societies.workforce.workforce.Workforce._process_task_with_intervention" />

### \_process\_task\_with\_intervention

```python theme={"system"}
def _process_task_with_intervention(self, task: Task):
```

Process task with human intervention support. This creates and
manages its own event loop to allow for pausing/resuming functionality.

**Parameters:**

* **task** (Task): The task to be processed.

**Returns:**

Task: The updated task.

<a id="camel.societies.workforce.workforce.Workforce.continue_from_pause" />

### continue\_from\_pause

```python theme={"system"}
def continue_from_pause(self):
```

**Returns:**

Optional\[Task]: The completed task if execution finishes, None if
still running/paused.

<a id="camel.societies.workforce.workforce.Workforce._start_child_node_when_paused" />

### \_start\_child\_node\_when\_paused

```python theme={"system"}
def _start_child_node_when_paused(self, start_coroutine: Coroutine):
```

Helper to start a child node when workforce is paused.

**Parameters:**

* **start\_coroutine**: The coroutine to start (e.g., worker\_node.start())

<a id="camel.societies.workforce.workforce.Workforce.add_single_agent_worker" />

### add\_single\_agent\_worker

```python theme={"system"}
def add_single_agent_worker(
    self,
    description: str,
    worker: ChatAgent,
    pool_max_size: int = DEFAULT_WORKER_POOL_SIZE,
    enable_workflow_memory: bool = False
):
```

Add a worker node to the workforce that uses a single agent.
Can be called when workforce is paused to dynamically add workers.

**Parameters:**

* **description** (str): Description of the worker node.
* **worker** (ChatAgent): The agent to be added.
* **pool\_max\_size** (int): Maximum size of the agent pool. (default: :obj:`10`)
* **enable\_workflow\_memory** (bool): Whether to enable workflow memory accumulation. Set to True if you plan to call save\_workflow\_memories(). (default: :obj:`False`)

**Returns:**

Workforce: The workforce node itself.

<a id="camel.societies.workforce.workforce.Workforce.add_role_playing_worker" />

### add\_role\_playing\_worker

```python theme={"system"}
def add_role_playing_worker(
    self,
    description: str,
    assistant_role_name: str,
    user_role_name: str,
    assistant_agent_kwargs: Optional[Dict] = None,
    user_agent_kwargs: Optional[Dict] = None,
    summarize_agent_kwargs: Optional[Dict] = None,
    chat_turn_limit: int = 3
):
```

Add a worker node to the workforce that uses `RolePlaying` system.
Can be called when workforce is paused to dynamically add workers.

**Parameters:**

* **description** (str): Description of the node.
* **assistant\_role\_name** (str): The role name of the assistant agent.
* **user\_role\_name** (str): The role name of the user agent.
* **assistant\_agent\_kwargs** (Optional\[Dict]): The keyword arguments to initialize the assistant agent in the role playing, like the model name, etc. (default: :obj:`None`)
* **user\_agent\_kwargs** (Optional\[Dict]): The keyword arguments to initialize the user agent in the role playing, like the model name, etc. (default: :obj:`None`)
* **summarize\_agent\_kwargs** (Optional\[Dict]): The keyword arguments to initialize the summarize agent, like the model name, etc. (default: :obj:`None`)
* **chat\_turn\_limit** (int): The maximum number of chat turns in the role playing. (default: :obj:`3`)

**Returns:**

Workforce: The workforce node itself.

<a id="camel.societies.workforce.workforce.Workforce.add_workforce" />

### add\_workforce

```python theme={"system"}
def add_workforce(self, workforce: Workforce):
```

Add a workforce node to the workforce.
Can be called when workforce is paused to dynamically add workers.

**Parameters:**

* **workforce** (Workforce): The workforce node to be added.

**Returns:**

Workforce: The workforce node itself.

<a id="camel.societies.workforce.workforce.Workforce.reset" />

### reset

```python theme={"system"}
def reset(self):
```

Reset the workforce and all the child nodes under it. Can only
be called when the workforce is not running.

<a id="camel.societies.workforce.workforce.Workforce.save_workflow_memories" />

### save\_workflow\_memories

```python theme={"system"}
def save_workflow_memories(self):
```

**Returns:**

Dict\[str, str]: Dictionary mapping worker node IDs to save results.
Values are either file paths (success) or error messages
(failure).

**Note:**

For better performance with multiple workers, use the async
version::

results = await workforce.save\_workflow\_memories\_async()

See Also:
:meth:`save_workflow_memories_async`: Async version with parallel
processing for significantly better performance.

<a id="camel.societies.workforce.workforce.Workforce.load_workflow_memories" />

### load\_workflow\_memories

```python theme={"system"}
def load_workflow_memories(
    self,
    session_id: Optional[str] = None,
    worker_max_workflows: int = 3,
    coordinator_max_workflows: int = 5,
    task_agent_max_workflows: int = 3
):
```

Load workflow memories for all SingleAgentWorker instances in the
workforce.

This method iterates through all child workers and loads relevant
workflow files for SingleAgentWorker instances using their
load\_workflow\_memories()
method. Workers match files based on their description names.

**Parameters:**

* **session\_id** (Optional\[str]): Specific workforce session ID to load from. If None, searches across all sessions. (default: :obj:`None`)
* **worker\_max\_workflows** (int): Maximum number of workflow files to load per worker agent. (default: :obj:`3`)
* **coordinator\_max\_workflows** (int): Maximum number of workflow files to load for the coordinator agent. (default: :obj:`5`)
* **task\_agent\_max\_workflows** (int): Maximum number of workflow files to load for the task planning agent. (default: :obj:`3`)

**Returns:**

Dict\[str, bool]: Dictionary mapping worker node IDs to load
success status.
True indicates successful loading, False indicates failure.

<a id="camel.societies.workforce.workforce.Workforce._load_management_agent_workflows" />

### \_load\_management\_agent\_workflows

```python theme={"system"}
def _load_management_agent_workflows(
    self,
    coordinator_max_workflows: int,
    task_agent_max_workflows: int,
    session_id: Optional[str] = None
):
```

Load workflow summaries for coordinator and task planning agents.

This method loads aggregated workflow summaries to help:

* Coordinator agent: understand task assignment patterns and worker
  capabilities
* Task agent: understand task decomposition patterns and
  successful strategies

**Parameters:**

* **coordinator\_max\_workflows** (int): Maximum number of workflow files to load for the coordinator agent.
* **task\_agent\_max\_workflows** (int): Maximum number of workflow files to load for the task planning agent.
* **session\_id** (Optional\[str]): Specific session ID to load from. If None, searches across all sessions.

<a id="camel.societies.workforce.workforce.Workforce.set_channel" />

### set\_channel

```python theme={"system"}
def set_channel(self, channel: TaskChannel):
```

Set the channel for the node and all the child nodes under it.

<a id="camel.societies.workforce.workforce.Workforce._get_child_nodes_info" />

### \_get\_child\_nodes\_info

```python theme={"system"}
def _get_child_nodes_info(self):
```

Get the information of all the child nodes under this node.

<a id="camel.societies.workforce.workforce.Workforce._get_node_info" />

### \_get\_node\_info

```python theme={"system"}
def _get_node_info(self, node):
```

Get descriptive information for a specific node type.

<a id="camel.societies.workforce.workforce.Workforce._get_single_agent_toolkit_info" />

### \_get\_single\_agent\_toolkit\_info

```python theme={"system"}
def _get_single_agent_toolkit_info(self, worker: 'SingleAgentWorker'):
```

Get formatted information for a SingleAgentWorker node.

<a id="camel.societies.workforce.workforce.Workforce._group_tools_by_toolkit" />

### \_group\_tools\_by\_toolkit

```python theme={"system"}
def _group_tools_by_toolkit(self, tool_dict: dict):
```

Group tools by their parent toolkit class names.

<a id="camel.societies.workforce.workforce.Workforce._get_valid_worker_ids" />

### \_get\_valid\_worker\_ids

```python theme={"system"}
def _get_valid_worker_ids(self):
```

**Returns:**

set: Set of valid worker IDs that can be assigned tasks.

<a id="camel.societies.workforce.workforce.Workforce._call_coordinator_for_assignment" />

### \_call\_coordinator\_for\_assignment

```python theme={"system"}
def _call_coordinator_for_assignment(self, tasks: List[Task], invalid_ids: Optional[List[str]] = None):
```

Call coordinator agent to assign tasks with optional validation
feedback in the case of invalid worker IDs.

**Parameters:**

* **tasks** (List\[Task]): Tasks to assign.
* **invalid\_ids** (List\[str], optional): Invalid worker IDs from previous attempt (if any).

**Returns:**

TaskAssignResult: Assignment result from coordinator.

<a id="camel.societies.workforce.workforce.Workforce._validate_assignments" />

### \_validate\_assignments

```python theme={"system"}
def _validate_assignments(self, assignments: List[TaskAssignment], valid_ids: Set[str]):
```

Validate task assignments against valid worker IDs.

**Parameters:**

* **assignments** (List\[TaskAssignment]): Assignments to validate.
* **valid\_ids** (Set\[str]): Set of valid worker IDs.

**Returns:**

Tuple\[List\[TaskAssignment], List\[TaskAssignment]]:
(valid\_assignments, invalid\_assignments)

<a id="camel.societies.workforce.workforce.Workforce._update_task_dependencies_from_assignments" />

### \_update\_task\_dependencies\_from\_assignments

```python theme={"system"}
def _update_task_dependencies_from_assignments(self, assignments: List[TaskAssignment], tasks: List[Task]):
```

Update Task.dependencies with actual Task objects based on
assignments.

**Parameters:**

* **assignments** (List\[TaskAssignment]): The task assignments containing dependency IDs.
* **tasks** (List\[Task]): The tasks that were assigned.

<a id="camel.societies.workforce.workforce.Workforce.get_workforce_log_tree" />

### get\_workforce\_log\_tree

```python theme={"system"}
def get_workforce_log_tree(self):
```

Returns an ASCII tree representation of the task hierarchy and
worker status.

<a id="camel.societies.workforce.workforce.Workforce.get_workforce_kpis" />

### get\_workforce\_kpis

```python theme={"system"}
def get_workforce_kpis(self):
```

Returns a dictionary of key performance indicators.

<a id="camel.societies.workforce.workforce.Workforce.dump_workforce_logs" />

### dump\_workforce\_logs

```python theme={"system"}
def dump_workforce_logs(self, file_path: str):
```

Dumps all collected logs to a JSON file.

**Parameters:**

* **file\_path** (str): The path to the JSON file.

<a id="camel.societies.workforce.workforce.Workforce._submit_coro_to_loop" />

### \_submit\_coro\_to\_loop

```python theme={"system"}
def _submit_coro_to_loop(self, coro: 'Coroutine'):
```

Thread-safe submission of coroutine to the workforce loop.

<a id="camel.societies.workforce.workforce.Workforce.stop" />

### stop

```python theme={"system"}
def stop(self):
```

Forcefully stop the workforce and its children immediately.

This is now an immediate stop (was previously a graceful lifecycle
cleanup). It cancels child listeners, clears pending/in-flight tasks,
and sets state to STOPPED without waiting for active work to finish.

<a id="camel.societies.workforce.workforce.Workforce.clone" />

### clone

```python theme={"system"}
def clone(self, with_memory: bool = False):
```

Creates a new instance of Workforce with the same configuration.

**Parameters:**

* **with\_memory** (bool, optional): Whether to copy the memory (conversation history) to the new instance. If True, the new instance will have the same conversation history. If False, the new instance will have a fresh memory. (default: :obj:`False`)

**Returns:**

Workforce: A new instance of Workforce with the same configuration.

<a id="camel.societies.workforce.workforce.Workforce.to_mcp" />

### to\_mcp

```python theme={"system"}
def to_mcp(
    self,
    name: str = 'CAMEL-Workforce',
    description: str = 'A workforce system using the CAMEL AI framework for multi-agent collaboration.',
    dependencies: Optional[List[str]] = None,
    host: str = 'localhost',
    port: int = 8001
):
```

Expose this Workforce as an MCP server.

**Parameters:**

* **name** (str): Name of the MCP server. (default: :obj:`CAMEL-Workforce`)
* **description** (str): Description of the workforce. If None, a generic description is used. (default: :obj:`A workforce system using the CAMEL AI framework for multi-agent collaboration.`)
* **dependencies** (Optional\[List\[str]]): Additional dependencies for the MCP server. (default: :obj:`None`)
* **host** (str): Host to bind to for HTTP transport. (default: :obj:`localhost`)
* **port** (int): Port to bind to for HTTP transport. (default: :obj:`8001`)

**Returns:**

FastMCP: An MCP server instance that can be run.
