WorkforceState
WorkforceSnapshot
init
Workforce
- Coordinator Agent: Assigns tasks to workers based on their capabilities
- Task Planner Agent: Decomposes complex tasks and composes results
- Dynamic Workers: Created at runtime when tasks fail repeatedly
- description (str): Description of the workforce.
- children (Optional[List[BaseNode]], optional): List of child nodes under this node. Each child node can be a worker node or another workforce node. (default: :obj:
None
) - coordinator_agent (Optional[ChatAgent], optional): A custom coordinator agent instance for task assignment and worker creation. If provided, the workforce will create a new agent using this agent’s model configuration but with the required system message and functionality. If None, a default agent will be created using DEFAULT model settings. (default: :obj:
None
) - task_agent (Optional[ChatAgent], optional): A custom task planning agent instance for task decomposition and composition. If provided, the workforce will create a new agent using this agent’s model configuration but with the required system message and tools (TaskPlanningToolkit). If None, a default agent will be created using DEFAULT model settings. (default: :obj:
None
) - new_worker_agent (Optional[ChatAgent], optional): A template agent for workers created dynamically at runtime when existing workers cannot handle failed tasks. If None, workers will be created with default settings including SearchToolkit, CodeExecutionToolkit, and ThinkingToolkit. (default: :obj:
None
) - graceful_shutdown_timeout (float, optional): The timeout in seconds for graceful shutdown when a task fails 3 times. During this period, the workforce remains active for debugging. Set to 0 for immediate shutdown. (default: :obj:
15.0
) - task_timeout_seconds (Optional[float], optional): The timeout in seconds for waiting for tasks to be returned by workers. If None, uses the global TASK_TIMEOUT_SECONDS value (600.0 seconds). Increase this value for tasks that require more processing time. (default: :obj:
None
) - share_memory (bool, optional): Whether to enable shared memory across SingleAgentWorker instances in the workforce. When enabled, all SingleAgentWorker instances, coordinator agent, and task planning agent will share their complete conversation history and function-calling trajectory, providing better context for task handoffs and continuity. Note: Currently only supports SingleAgentWorker instances; RolePlayingWorker and nested Workforce instances do not participate in memory sharing. (default: :obj:
False
) - use_structured_output_handler (bool, optional): Whether to use the structured output handler instead of native structured output. When enabled, the workforce will use prompts with structured output instructions and regex extraction to parse responses. This ensures compatibility with agents that don’t reliably support native structured output. When disabled, the workforce uses the native response_format parameter. (default: :obj:
True
) - callbacks (Optional[List[WorkforceCallback]], optional): A list of callback handlers to observe and record workforce lifecycle events and metrics (e.g., task creation/assignment/start/completion/ failure, worker creation/deletion, all-tasks-completed). All items must be instances of :class:
WorkforceCallback
, otherwise - a: class:
ValueError
is raised. If none of the provided callbacks implement :class:WorkforceMetrics
, a built-in :class:WorkforceLogger
(implements both callback and metrics) is added automatically. If at least one provided callback - implements: class:
WorkforceMetrics
, no default logger is added. (default: :obj:None
)
init
_initialize_callbacks
_notify_worker_created
_get_or_create_shared_context_utility
- session_id (Optional[str]): Custom session ID to use. If None, auto-generates a timestamped session ID. (default: :obj:
None
)
_validate_agent_compatibility
- agent (ChatAgent): The agent to validate.
- agent_context (str): Context description for error messages.
_attach_pause_event_to_agent
_ensure_pause_event_in_kwargs
repr
_collect_shared_memory
_share_memory_with_agents
- shared_memory (Dict[str, List]): Memory records collected from all agents to be shared.
_sync_shared_memory
_update_dependencies_for_decomposition
_increment_in_flight_tasks
_decrement_in_flight_tasks
_cleanup_task_tracking
- task_id (str): The ID of the task to clean up.
_decompose_task
_analyze_task
- task (Task): The task to analyze
- for_failure (bool): True for failure analysis, False for quality evaluation
- error_message (Optional[str]): Error message, required when for_failure=True
pause
resume
stop_gracefully
skip_gracefully
save_snapshot
list_snapshots
get_pending_tasks
get_completed_tasks
modify_task_content
add_task
as_subtask=True
to add a task directly to the pending
subtask queue without decomposition.
Parameters:
- content (str): The content of the task.
- task_id (Optional[str], optional): Optional ID for the task. If not provided, a unique ID will be generated.
- additional_info (Optional[Dict[str, Any]], optional): Optional additional metadata for the task.
- as_subtask (bool, optional): If True, adds the task directly to the pending subtask queue. If False, adds as a main task that will be decomposed. Defaults to False.
- insert_position (int, optional): Position to insert the task in the pending queue. Only applies when as_subtask=True. Defaults to -1 (append to end).
add_main_task
add_task
with as_subtask=False
.
Parameters:
- content (str): The content of the main task.
- task_id (Optional[str], optional): Optional ID for the task.
- additional_info (Optional[Dict[str, Any]], optional): Optional additional metadata.
add_subtask
add_task
with as_subtask=True
.
Parameters:
- content (str): The content of the subtask.
- task_id (Optional[str], optional): Optional ID for the task.
- additional_info (Optional[Dict[str, Any]], optional): Optional additional metadata.
- insert_position (int, optional): Position to insert the task. Defaults to -1 (append to end).
remove_task
- task_id (str): The ID of the task to remove.
reorder_tasks
resume_from_task
restore_from_snapshot
get_workforce_status
process_task
- task (Task): The task to be processed.
_process_task_with_intervention
- task (Task): The task to be processed.
continue_from_pause
_start_child_node_when_paused
- start_coroutine: The coroutine to start (e.g., worker_node.start())
add_single_agent_worker
- description (str): Description of the worker node.
- worker (ChatAgent): The agent to be added.
- pool_max_size (int): Maximum size of the agent pool. (default: :obj:
10
) - enable_workflow_memory (bool): Whether to enable workflow memory accumulation. Set to True if you plan to call save_workflow_memories(). (default: :obj:
False
)
add_role_playing_worker
RolePlaying
system.
Can be called when workforce is paused to dynamically add workers.
Parameters:
- description (str): Description of the node.
- assistant_role_name (str): The role name of the assistant agent.
- user_role_name (str): The role name of the user agent.
- assistant_agent_kwargs (Optional[Dict]): The keyword arguments to initialize the assistant agent in the role playing, like the model name, etc. (default: :obj:
None
) - user_agent_kwargs (Optional[Dict]): The keyword arguments to initialize the user agent in the role playing, like the model name, etc. (default: :obj:
None
) - summarize_agent_kwargs (Optional[Dict]): The keyword arguments to initialize the summarize agent, like the model name, etc. (default: :obj:
None
) - chat_turn_limit (int): The maximum number of chat turns in the role playing. (default: :obj:
3
)
add_workforce
- workforce (Workforce): The workforce node to be added.
reset
save_workflow_memories
save_workflow_memories_async
instead for parallel processing and significantly better
performance.
This method iterates through all child workers and triggers workflow
saving for SingleAgentWorker instances using their
save_workflow_memories()
method.
Other worker types are skipped.
Parameters:
- session_id (Optional[str]): Custom session ID to use for saving workflows. If None, auto-generates a timestamped session ID. Useful for organizing workflows by project or context. (default: :obj:
None
)
save_workflow_memories_async
: Async version with parallel
processing for significantly better performance.
load_workflow_memories
- session_id (Optional[str]): Specific workforce session ID to load from. If None, searches across all sessions. (default: :obj:
None
) - worker_max_workflows (int): Maximum number of workflow files to load per worker agent. (default: :obj:
3
) - coordinator_max_workflows (int): Maximum number of workflow files to load for the coordinator agent. (default: :obj:
5
) - task_agent_max_workflows (int): Maximum number of workflow files to load for the task planning agent. (default: :obj:
3
)
_load_management_agent_workflows
- Coordinator agent: understand task assignment patterns and worker capabilities
- Task agent: understand task decomposition patterns and successful strategies
- coordinator_max_workflows (int): Maximum number of workflow files to load for the coordinator agent.
- task_agent_max_workflows (int): Maximum number of workflow files to load for the task planning agent.
- session_id (Optional[str]): Specific session ID to load from. If None, searches across all sessions.
set_channel
_get_child_nodes_info
_get_node_info
_get_single_agent_toolkit_info
_group_tools_by_toolkit
_get_valid_worker_ids
_call_coordinator_for_assignment
- tasks (List[Task]): Tasks to assign.
- invalid_ids (List[str], optional): Invalid worker IDs from previous attempt (if any).
_validate_assignments
- assignments (List[TaskAssignment]): Assignments to validate.
- valid_ids (Set[str]): Set of valid worker IDs.
_update_task_dependencies_from_assignments
- assignments (List[TaskAssignment]): The task assignments containing dependency IDs.
- tasks (List[Task]): The tasks that were assigned.
get_workforce_log_tree
get_workforce_kpis
dump_workforce_logs
- file_path (str): The path to the JSON file.
_submit_coro_to_loop
stop
clone
- with_memory (bool, optional): Whether to copy the memory (conversation history) to the new instance. If True, the new instance will have the same conversation history. If False, the new instance will have a fresh memory. (default: :obj:
False
)
to_mcp
- name (str): Name of the MCP server. (default: :obj:
CAMEL-Workforce
) - description (str): Description of the workforce. If None, a generic description is used. (default: :obj:
A workforce system using the CAMEL AI framework for multi-agent collaboration.
) - dependencies (Optional[List[str]]): Additional dependencies for the MCP server. (default: :obj:
None
) - host (str): Host to bind to for HTTP transport. (default: :obj:
localhost
) - port (int): Port to bind to for HTTP transport. (default: :obj:
8001
)