WorkforceLogger

class WorkforceLogger:

Logs events and metrics for a Workforce instance.

init

def __init__(self, workforce_id: str):

Initializes the WorkforceLogger.

Parameters:

  • workforce_id (str): The unique identifier for the workforce.

_log_event

def _log_event(self, event_type: str, **kwargs: Any):

Internal method to create and store a log entry.

Parameters:

  • event_type (str): The type of event being logged. **kwargs: Additional data associated with the event.

log_task_created

def log_task_created(
    self,
    task_id: str,
    description: str,
    parent_task_id: Optional[str] = None,
    task_type: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
):

Logs the creation of a new task.

log_task_decomposed

def log_task_decomposed(
    self,
    parent_task_id: str,
    subtask_ids: List[str],
    metadata: Optional[Dict[str, Any]] = None
):

Logs the decomposition of a task into subtasks.

log_task_assigned

def log_task_assigned(
    self,
    task_id: str,
    worker_id: str,
    queue_time_seconds: Optional[float] = None,
    dependencies: Optional[List[str]] = None,
    metadata: Optional[Dict[str, Any]] = None
):

Logs the assignment of a task to a worker.

log_task_started

def log_task_started(
    self,
    task_id: str,
    worker_id: str,
    metadata: Optional[Dict[str, Any]] = None
):

Logs when a worker starts processing a task.

log_task_completed

def log_task_completed(
    self,
    task_id: str,
    worker_id: str,
    result_summary: Optional[str] = None,
    processing_time_seconds: Optional[float] = None,
    token_usage: Optional[Dict[str, int]] = None,
    metadata: Optional[Dict[str, Any]] = None
):

Logs the successful completion of a task.

log_task_failed

def log_task_failed(
    self,
    task_id: str,
    error_message: str,
    error_type: str,
    worker_id: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
):

Logs the failure of a task.

log_worker_created

def log_worker_created(
    self,
    worker_id: str,
    worker_type: str,
    role: str,
    metadata: Optional[Dict[str, Any]] = None
):

Logs the creation of a new worker.

log_worker_deleted

def log_worker_deleted(
    self,
    worker_id: str,
    reason: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
):

Logs the deletion of a worker.

reset_task_data

def reset_task_data(self):

Resets logs and data related to tasks, preserving worker information.

log_queue_status

def log_queue_status(
    self,
    queue_name: str,
    length: int,
    pending_task_ids: Optional[List[str]] = None,
    metadata: Optional[Dict[str, Any]] = None
):

Logs the status of a task queue.

dump_to_json

def dump_to_json(self, file_path: str):

Dumps all log entries to a JSON file.

Parameters:

  • file_path (str): The path to the JSON file.

_get_all_tasks_in_hierarchy

def _get_all_tasks_in_hierarchy(self, task_id: str):

Recursively collect all tasks in the hierarchy starting from task_id.

_get_task_tree_string

def _get_task_tree_string(
    self,
    task_id: str,
    prefix: str = '',
    is_last: bool = True
):

Generate a string representation of the task tree.

get_ascii_tree_representation

def get_ascii_tree_representation(self):

Generates an ASCII tree representation of the current task hierarchy and worker status.

get_kpis

def get_kpis(self):

Calculates and returns key performance indicators from the logs.