> ## Documentation Index
> Fetch the complete documentation index at: https://docs.camel-ai.org/llms.txt
> Use this file to discover all available pages before exploring further.

# Camel.societies.workforce.utils

<a id="camel.societies.workforce.utils" />

<a id="camel.societies.workforce.utils.is_generic_role_name" />

## is\_generic\_role\_name

```python theme={"system"}
def is_generic_role_name(role_name: str):
```

Check if a role name is generic and should trigger fallback logic.

Generic role names are common, non-specific identifiers that don't
provide meaningful information about an agent's actual purpose.
When a role name is generic, fallback logic should be used to find
a more specific identifier (e.g., from LLM-generated agent\_title
or description).

**Parameters:**

* **role\_name** (str): The role name to check (will be converted to lowercase for case-insensitive comparison).

**Returns:**

bool: True if the role name is generic, False otherwise.

<a id="camel.societies.workforce.utils.WorkflowMetadata" />

## WorkflowMetadata

```python theme={"system"}
class WorkflowMetadata(BaseModel):
```

Pydantic model for workflow metadata tracking.

This model defines the formal schema for workflow metadata that tracks
versioning, timestamps, and contextual information about saved workflows.
Used to maintain workflow history and enable proper version management.

<a id="camel.societies.workforce.utils.WorkflowConfig" />

## WorkflowConfig

```python theme={"system"}
class WorkflowConfig(BaseModel):
```

Configuration for workflow memory management.

Centralizes all workflow-related configuration options to avoid scattered
settings across multiple files and methods.

<a id="camel.societies.workforce.utils.WorkerConf" />

## WorkerConf

```python theme={"system"}
class WorkerConf(BaseModel):
```

The configuration of a worker.

<a id="camel.societies.workforce.utils.TaskResult" />

## TaskResult

```python theme={"system"}
class TaskResult(BaseModel):
```

The result of a task.

<a id="camel.societies.workforce.utils.QualityEvaluation" />

## QualityEvaluation

```python theme={"system"}
class QualityEvaluation(BaseModel):
```

Quality evaluation result for a completed task.

.. deprecated::
Use :class:`TaskAnalysisResult` instead. This class is kept for
backward compatibility.

<a id="camel.societies.workforce.utils.TaskAssignment" />

## TaskAssignment

```python theme={"system"}
class TaskAssignment(BaseModel):
```

An individual task assignment within a batch.

<a id="camel.societies.workforce.utils.TaskAssignment._split_and_strip" />

### \_split\_and\_strip

```python theme={"system"}
def _split_and_strip(dep_str: str):
```

Utility to split a comma separated string and strip
whitespace.

<a id="camel.societies.workforce.utils.TaskAssignment.validate_dependencies" />

### validate\_dependencies

```python theme={"system"}
def validate_dependencies(cls, v):
```

<a id="camel.societies.workforce.utils.TaskAssignResult" />

## TaskAssignResult

```python theme={"system"}
class TaskAssignResult(BaseModel):
```

The result of task assignment for both single and batch
assignments.

<a id="camel.societies.workforce.utils.RecoveryStrategy" />

## RecoveryStrategy

```python theme={"system"}
class RecoveryStrategy(str, Enum):
```

Strategies for handling failed tasks.

<a id="camel.societies.workforce.utils.RecoveryStrategy.__str__" />

### **str**

```python theme={"system"}
def __str__(self):
```

<a id="camel.societies.workforce.utils.RecoveryStrategy.__repr__" />

### **repr**

```python theme={"system"}
def __repr__(self):
```

<a id="camel.societies.workforce.utils.FailureHandlingConfig" />

## FailureHandlingConfig

```python theme={"system"}
class FailureHandlingConfig(BaseModel):
```

Configuration for failure handling behavior in Workforce.

This configuration allows users to customize how the Workforce handles
task failures. This config allows users to disable reassignment or other
recovery strategies as needed.

**Parameters:**

* **max\_retries** (int): Maximum number of retry attempts before giving up on a task. (default: :obj:`3`)
* **enabled\_strategies** (Optional\[List\[RecoveryStrategy]]): List of recovery strategies that are allowed to be used. Can be specified as RecoveryStrategy enums or strings (e.g., \["retry", "replan"]). If None, all strategies are enabled (with LLM analysis). If an empty list, no recovery strategies are applied and failed tasks are marked as failed immediately. If only \["retry"] is specified, simple retry is used without LLM analysis. (default: :obj:`None` - all strategies enabled)
* **halt\_on\_max\_retries** (bool): Whether to halt the entire workforce when a task exceeds max retries. If False, the task is marked as failed and the workflow continues (similar to PIPELINE mode behavior). (default: :obj:`True` for AUTO\_DECOMPOSE mode behavior)

<a id="camel.societies.workforce.utils.FailureHandlingConfig.validate_enabled_strategies" />

### validate\_enabled\_strategies

```python theme={"system"}
def validate_enabled_strategies(cls, v):
```

Convert string list to RecoveryStrategy enum list.

<a id="camel.societies.workforce.utils.FailureContext" />

## FailureContext

```python theme={"system"}
class FailureContext(BaseModel):
```

Context information about a task failure.

<a id="camel.societies.workforce.utils.TaskAnalysisResult" />

## TaskAnalysisResult

```python theme={"system"}
class TaskAnalysisResult(BaseModel):
```

Unified result for task failure analysis and quality evaluation.

This model combines both failure recovery decisions and quality evaluation
results into a single structure. For failure analysis, only the recovery
strategy and reasoning fields are populated. For quality evaluation, all
fields including quality\_score and issues are populated.

<a id="camel.societies.workforce.utils.TaskAnalysisResult.is_quality_evaluation" />

### is\_quality\_evaluation

```python theme={"system"}
def is_quality_evaluation(self):
```

**Returns:**

bool: True if this is a quality evaluation (has quality\_score),
False if this is a failure analysis.

<a id="camel.societies.workforce.utils.TaskAnalysisResult.quality_sufficient" />

### quality\_sufficient

```python theme={"system"}
def quality_sufficient(self):
```

**Returns:**

bool: True if quality is sufficient (score `>= 70` and no recovery
strategy recommended), False otherwise. Always False for
failure analysis results.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder" />

## PipelineTaskBuilder

```python theme={"system"}
class PipelineTaskBuilder:
```

Helper class for building pipeline tasks with dependencies.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.__init__" />

### **init**

```python theme={"system"}
def __init__(self):
```

Initialize an empty pipeline task builder.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.add" />

### add

```python theme={"system"}
def add(
    self,
    content: str,
    task_id: Optional[str] = None,
    dependencies: Optional[List[str]] = None,
    additional_info: Optional[dict] = None,
    auto_depend: bool = True
):
```

Add a task to the pipeline with support for chaining.

**Parameters:**

* **content** (str): The content/description of the task.
* **task\_id** (str, optional): Unique identifier for the task. If None, a unique ID will be generated. (default: :obj:`None`)
* **dependencies** (List\[str], optional): List of task IDs that this task depends on. If None and auto\_depend=True, will depend on the last added task. (default: :obj:`None`)
* **additional\_info** (dict, optional): Additional information for the task. (default: :obj:`None`)
* **auto\_depend** (bool, optional): If True and dependencies is None, automatically depend on the last added task. (default: :obj:`True`)

**Returns:**

PipelineTaskBuilder: Self for method chaining.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.add_parallel_tasks" />

### add\_parallel\_tasks

```python theme={"system"}
def add_parallel_tasks(
    self,
    task_contents: List[str],
    dependencies: Optional[List[str]] = None,
    task_id_prefix: str = 'parallel',
    auto_depend: bool = True
):
```

Add multiple parallel tasks that can execute simultaneously.

**Parameters:**

* **task\_contents** (List\[str]): List of task content strings.
* **dependencies** (List\[str], optional): Common dependencies for all parallel tasks. If None and auto\_depend=True, will depend on the last added task. (default: :obj:`None`)
* **task\_id\_prefix** (str, optional): Prefix for generated task IDs. (default: :obj:`"parallel"`)
* **auto\_depend** (bool, optional): If True and dependencies is None, automatically depend on the last added task. (default: :obj:`True`)

**Returns:**

PipelineTaskBuilder: Self for method chaining.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.add_sync_task" />

### add\_sync\_task

```python theme={"system"}
def add_sync_task(
    self,
    content: str,
    wait_for: Optional[List[str]] = None,
    task_id: Optional[str] = None
):
```

Add a synchronization task that waits for multiple tasks.

**Parameters:**

* **content** (str): Content of the synchronization task.
* **wait\_for** (List\[str], optional): List of task IDs to wait for. If None, will automatically wait for the last parallel tasks. (default: :obj:`None`)
* **task\_id** (str, optional): ID for the sync task. If None, a unique ID will be generated. (default: :obj:`None`)

**Returns:**

PipelineTaskBuilder: Self for method chaining.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.build" />

### build

```python theme={"system"}
def build(self):
```

**Returns:**

List\[Task]: List of tasks with proper dependency relationships.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.clear" />

### clear

```python theme={"system"}
def clear(self):
```

Clear all tasks from the builder.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.fork" />

### fork

```python theme={"system"}
def fork(self, task_contents: List[str]):
```

Create parallel branches from the current task (alias for
add\_parallel\_tasks).

**Parameters:**

* **task\_contents** (List\[str]): List of task content strings for parallel execution.

**Returns:**

PipelineTaskBuilder: Self for method chaining.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.join" />

### join

```python theme={"system"}
def join(self, content: str, task_id: Optional[str] = None):
```

Join parallel branches with a synchronization task (alias for
add\_sync\_task).

**Parameters:**

* **content** (str): Content of the join/sync task.
* **task\_id** (str, optional): ID for the sync task.

**Returns:**

PipelineTaskBuilder: Self for method chaining.

<a id="camel.societies.workforce.utils.PipelineTaskBuilder._validate_dependencies" />

### \_validate\_dependencies

```python theme={"system"}
def _validate_dependencies(self):
```

<a id="camel.societies.workforce.utils.PipelineTaskBuilder.get_task_info" />

### get\_task\_info

```python theme={"system"}
def get_task_info(self):
```

**Returns:**

dict: Dictionary containing task count and task details.

<a id="camel.societies.workforce.utils.check_if_running" />

## check\_if\_running

```python theme={"system"}
def check_if_running(
    running: bool,
    max_retries: int = 3,
    retry_delay: float = 1.0,
    handle_exceptions: bool = False
):
```

Check if the workforce is (not) running, specified by the boolean
value. Provides fault tolerance through automatic retries and exception
handling.

**Parameters:**

* **running** (bool): Expected running state (True or False).
* **max\_retries** (int, optional): Maximum number of retry attempts if the operation fails. Set to 0 to disable retries. (default: :obj:`3`)
* **retry\_delay** (float, optional): Delay in seconds between retry attempts. (default: :obj:`1.0`)
* **handle\_exceptions** (bool, optional): If True, catch and log exceptions instead of propagating them. (default: :obj:`False`)

**Raises:**

* **RuntimeError**: If the workforce is not in the expected status and
* **Exception**: Any exception raised by the decorated function if
