class WorkforceLogger:
def __init__(self, workforce_id: str):
def _log_event(self, event_type: str, **kwargs: Any):
def log_task_created(
self,
task_id: str,
description: str,
parent_task_id: Optional[str] = None,
task_type: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
def log_task_decomposed(
self,
parent_task_id: str,
subtask_ids: List[str],
metadata: Optional[Dict[str, Any]] = None
):
def log_task_assigned(
self,
task_id: str,
worker_id: str,
queue_time_seconds: Optional[float] = None,
dependencies: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None
):
def log_task_started(
self,
task_id: str,
worker_id: str,
metadata: Optional[Dict[str, Any]] = None
):
def log_task_completed(
self,
task_id: str,
worker_id: str,
result_summary: Optional[str] = None,
processing_time_seconds: Optional[float] = None,
token_usage: Optional[Dict[str, int]] = None,
metadata: Optional[Dict[str, Any]] = None
):
def log_task_failed(
self,
task_id: str,
error_message: str,
worker_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
def log_worker_created(
self,
worker_id: str,
worker_type: str,
role: str,
metadata: Optional[Dict[str, Any]] = None
):
def log_worker_deleted(
self,
worker_id: str,
reason: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None
):
def reset_task_data(self):
def log_queue_status(
self,
queue_name: str,
length: int,
pending_task_ids: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None
):
def dump_to_json(self, file_path: str):
def _get_all_tasks_in_hierarchy(self, task_id: str):
def _get_task_tree_string(
self,
task_id: str,
prefix: str = '',
is_last: bool = True
):
def get_ascii_tree_representation(self):
def get_kpis(self):