class WorkforceLogger:
Logs events and metrics for a Workforce instance.
def __init__(self, workforce_id: str):
Initializes the WorkforceLogger.
Parameters:
def _log_event(self, event_type: str, **kwargs: Any):
Internal method to create and store a log entry.
Parameters:
def log_task_created(
self,
task_id: str,
description: str,
parent_task_id: Optional[str] = None,
task_type: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
Logs the creation of a new task.
def log_task_decomposed(
self,
parent_task_id: str,
subtask_ids: List[str],
metadata: Optional[Dict[str, Any]] = None
):
Logs the decomposition of a task into subtasks.
def log_task_assigned(
self,
task_id: str,
worker_id: str,
queue_time_seconds: Optional[float] = None,
dependencies: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None
):
Logs the assignment of a task to a worker.
def log_task_started(
self,
task_id: str,
worker_id: str,
metadata: Optional[Dict[str, Any]] = None
):
Logs when a worker starts processing a task.
def log_task_completed(
self,
task_id: str,
worker_id: str,
result_summary: Optional[str] = None,
processing_time_seconds: Optional[float] = None,
token_usage: Optional[Dict[str, int]] = None,
metadata: Optional[Dict[str, Any]] = None
):
Logs the successful completion of a task.
def log_task_failed(
self,
task_id: str,
error_message: str,
worker_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
Logs the failure of a task.
def log_worker_created(
self,
worker_id: str,
worker_type: str,
role: str,
metadata: Optional[Dict[str, Any]] = None
):
Logs the creation of a new worker.
def log_worker_deleted(
self,
worker_id: str,
reason: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
Logs the deletion of a worker.
def reset_task_data(self):
Resets logs and data related to tasks, preserving worker information.
def log_queue_status(
self,
queue_name: str,
length: int,
pending_task_ids: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None
):
Logs the status of a task queue.
def dump_to_json(self, file_path: str):
Dumps all log entries to a JSON file.
Parameters:
def _get_all_tasks_in_hierarchy(self, task_id: str):
Recursively collect all tasks in the hierarchy starting from task_id.
def _get_task_tree_string(
self,
task_id: str,
prefix: str = '',
is_last: bool = True
):
Generate a string representation of the task tree.
def get_ascii_tree_representation(self):
Generates an ASCII tree representation of the current task hierarchy and worker status.
def get_kpis(self):
Calculates and returns key performance indicators from the logs.